# Copyright (C) 2022 Andrew Williamson
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
""" This module contains functions for calculating and manipulating coherent
triggers.
"""
import logging
import numpy as np
logger = logging.getLogger('pycbc.events.coherent')
[docs]
def get_coinc_indexes(idx_dict, time_delay_idx):
"""Return the indexes corresponding to coincident triggers, requiring
they are seen in at least two detectors in the network
Parameters
----------
idx_dict: dict
Dictionary of indexes of triggers above threshold in each
detector
time_delay_idx: dict
Dictionary giving time delay index (time_delay*sample_rate) for
each ifo
Returns
-------
coinc_idx: list
List of indexes for triggers in geocent time that appear in
multiple detectors
"""
coinc_list = np.array([], dtype=int)
for ifo in idx_dict.keys():
# Create list of indexes above threshold in single detector in geocent
# time. Can then search for triggers that appear in multiple detectors
# later.
if len(idx_dict[ifo]) != 0:
coinc_list = np.hstack(
[coinc_list, idx_dict[ifo] - time_delay_idx[ifo]]
)
# Search through coinc_idx for repeated indexes. These must have been loud
# in at least 2 detectors.
counts = np.unique(coinc_list, return_counts=True)
coinc_idx = counts[0][counts[1] > 1]
return coinc_idx
[docs]
def get_coinc_triggers(snrs, idx, t_delay_idx):
"""Returns the coincident triggers from the longer SNR timeseries
Parameters
----------
snrs: dict
Dictionary of single detector SNR time series
idx: list
List of geocentric time indexes of coincident triggers
t_delay_idx: dict
Dictionary of indexes corresponding to light travel time from
geocenter for each detector
Returns
-------
coincs: dict
Dictionary of coincident trigger SNRs in each detector
"""
# loops through snrs
# %len(snrs[ifo]) was included as part of a wrap-around solution
coincs = {
ifo: snrs[ifo][(idx + t_delay_idx[ifo]) % len(snrs[ifo])]
for ifo in snrs}
return coincs
[docs]
def coincident_snr(snr_dict, index, threshold, time_delay_idx):
"""Calculate the coincident SNR for all coincident triggers above
threshold
Parameters
----------
snr_dict: dict
Dictionary of individual detector SNRs
index: list
List of indexes (geocentric) for which to calculate coincident
SNR
threshold: float
Coincident SNR threshold. Triggers below this are cut
time_delay_idx: dict
Dictionary of time delay from geocenter in indexes for each
detector
Returns
-------
rho_coinc: numpy.ndarray
Coincident SNR values for surviving triggers
index: list
The subset of input indexes corresponding to triggers that
survive the cuts
coinc_triggers: dict
Dictionary of individual detector SNRs for triggers that
survive cuts
"""
# Restrict the snr timeseries to just the interesting points
coinc_triggers = get_coinc_triggers(snr_dict, index, time_delay_idx)
# Calculate the coincident snr
snr_array = np.array(
[coinc_triggers[ifo] for ifo in coinc_triggers.keys()]
)
rho_coinc = abs(np.sqrt(np.sum(snr_array * snr_array.conj(), axis=0)))
# Apply threshold
thresh_indexes = rho_coinc > threshold
index = index[thresh_indexes]
coinc_triggers = get_coinc_triggers(snr_dict, index, time_delay_idx)
rho_coinc = rho_coinc[thresh_indexes]
return rho_coinc, index, coinc_triggers
[docs]
def get_projection_matrix(f_plus, f_cross, sigma, projection="standard"):
"""Calculate the matrix that projects the signal onto the network.
Definitions can be found in Fairhurst (2018) [arXiv:1712.04724].
For the standard projection see Eq. 8, and for left/right
circular projections see Eq. 21, with further discussion in
Appendix A. See also Williamson et al. (2014) [arXiv:1410.6042]
for discussion in context of the GRB search with restricted
binary inclination angles.
Parameters
----------
f_plus: dict
Dictionary containing the plus antenna response factors for
each IFO
f_cross: dict
Dictionary containing the cross antenna response factors for
each IFO
sigma: dict
Dictionary of the sensitivity weights for each IFO
projection: optional, {string, 'standard'}
The signal polarization to project. Choice of 'standard'
(unrestricted; default), 'right' or 'left' (circular
polarizations)
Returns
-------
projection_matrix: np.ndarray
The matrix that projects the signal onto the detector network
"""
# Calculate the weighted antenna responses
keys = sorted(sigma.keys())
w_p = np.array([sigma[ifo] * f_plus[ifo] for ifo in keys])
w_c = np.array([sigma[ifo] * f_cross[ifo] for ifo in keys])
# Get the projection matrix associated with the requested projection
if projection == "standard":
denom = np.dot(w_p, w_p) * np.dot(w_c, w_c) - np.dot(w_p, w_c) ** 2
projection_matrix = (
np.dot(w_c, w_c) * np.outer(w_p, w_p)
+ np.dot(w_p, w_p) * np.outer(w_c, w_c)
- np.dot(w_p, w_c) * (np.outer(w_p, w_c) + np.outer(w_c, w_p))
) / denom
elif projection == "left":
projection_matrix = (
np.outer(w_p, w_p)
+ np.outer(w_c, w_c)
+ (np.outer(w_p, w_c) - np.outer(w_c, w_p)) * 1j
) / (np.dot(w_p, w_p) + np.dot(w_c, w_c))
elif projection == "right":
projection_matrix = (
np.outer(w_p, w_p)
+ np.outer(w_c, w_c)
+ (np.outer(w_c, w_p) - np.outer(w_p, w_c)) * 1j
) / (np.dot(w_p, w_p) + np.dot(w_c, w_c))
else:
raise ValueError(
f'Unknown projection: {projection}. Allowed values are: '
'"standard", "left", and "right"')
return projection_matrix
[docs]
def coherent_snr(
snr_triggers, index, threshold, projection_matrix, coinc_snr=None
):
"""Calculate the coherent SNR for a given set of triggers. See
Eq. 2.26 of Harry & Fairhurst (2011) [arXiv:1012.4939].
Parameters
----------
snr_triggers: dict
Dictionary of the normalised complex snr time series for each
ifo
index: numpy.ndarray
Array of the indexes corresponding to triggers
threshold: float
Coherent SNR threshold. Triggers below this are cut
projection_matrix: numpy.ndarray
Matrix that projects the signal onto the network
coinc_snr: Optional- The coincident snr for each trigger.
Returns
-------
rho_coh: numpy.ndarray
Array of coherent SNR for the detector network
index: numpy.ndarray
Indexes that survive cuts
snrv: dict
Dictionary of individual deector triggers that survive cuts
coinc_snr: list or None (default: None)
The coincident SNR values for triggers surviving the coherent
cut
"""
# Calculate rho_coh
snr_array = np.array(
[snr_triggers[ifo] for ifo in sorted(snr_triggers.keys())]
)
snr_proj = np.inner(snr_array.conj().transpose(), projection_matrix)
rho_coh2 = sum(snr_proj.transpose() * snr_array)
rho_coh = abs(np.sqrt(rho_coh2))
# Apply thresholds
above = rho_coh > threshold
index = index[above]
coinc_snr = [] if coinc_snr is None else coinc_snr
if len(coinc_snr) != 0:
coinc_snr = coinc_snr[above]
snrv = {
ifo: snr_triggers[ifo][above]
for ifo in snr_triggers.keys()
}
rho_coh = rho_coh[above]
return rho_coh, index, snrv, coinc_snr
[docs]
def network_chisq(chisq, chisq_dof, snr_dict):
"""Calculate the network chi-squared statistic. This is the sum of
SNR-weighted individual detector chi-squared values. See Eq. 5.4
of Dorrington (2019) [http://orca.cardiff.ac.uk/id/eprint/128124].
Parameters
----------
chisq: dict
Dictionary of individual detector chi-squared statistics
chisq_dof: dict
Dictionary of the number of degrees of freedom of the
chi-squared statistic
snr_dict: dict
Dictionary of complex individual detector SNRs
Returns
-------
net_chisq: list
Network chi-squared values
"""
ifos = sorted(snr_dict.keys())
chisq_per_dof = dict.fromkeys(ifos)
for ifo in ifos:
chisq_per_dof[ifo] = chisq[ifo] / chisq_dof[ifo]
snr2 = {
ifo: np.real(np.array(snr_dict[ifo]) * np.array(snr_dict[ifo]).conj())
for ifo in ifos
}
coinc_snr2 = sum(snr2.values())
snr2_ratio = {ifo: snr2[ifo] / coinc_snr2 for ifo in ifos}
net_chisq = sum([chisq_per_dof[ifo] * snr2_ratio[ifo] for ifo in ifos])
return net_chisq
[docs]
def null_snr(
rho_coh, rho_coinc, apply_cut=True, null_min=5.25, null_grad=0.2,
null_step=20.0, index=None, snrv=None
):
"""Calculate the null SNR and optionally apply threshold cut where
null SNR > null_min where coherent SNR < null_step
and null SNR > (null_grad * rho_coh + null_min) elsewhere. See
Eq. 3.1 of Harry & Fairhurst (2011) [arXiv:1012.4939] or
Eqs. 11 and 12 of Williamson et al. (2014) [arXiv:1410.6042]..
Parameters
----------
rho_coh: numpy.ndarray
Array of coherent snr triggers
rho_coinc: numpy.ndarray
Array of coincident snr triggers
apply_cut: bool
Apply a cut and downweight on null SNR determined by null_min,
null_grad, null_step (default True)
null_min: scalar
Any trigger with null SNR below this is retained
null_grad: scalar
Gradient of null SNR cut where coherent SNR > null_step
null_step: scalar
The threshold in coherent SNR rho_coh above which the null SNR
threshold increases as null_grad * rho_coh
index: dict or None (optional; default None)
Indexes of triggers. If given, will remove triggers that fail
cuts
snrv: dict of None (optional; default None)
Individual detector SNRs. If given will remove triggers that
fail cut
Returns
-------
null: numpy.ndarray
Null SNR for surviving triggers
rho_coh: numpy.ndarray
Coherent SNR for surviving triggers
rho_coinc: numpy.ndarray
Coincident SNR for suviving triggers
index: dict
Indexes for surviving triggers
snrv: dict
Single detector SNRs for surviving triggers
"""
index = {} if index is None else index
snrv = {} if snrv is None else snrv
# Calculate null SNRs
null2 = rho_coinc ** 2 - rho_coh ** 2
# Numerical errors may make this negative and break the sqrt, so set
# negative values to 0.
null2[null2 < 0] = 0
null = null2 ** 0.5
if apply_cut:
# Make cut on null.
keep = (
((null < null_min) & (rho_coh <= null_step))
| (
(null < ((rho_coh - null_step) * null_grad + null_min))
& (rho_coh > null_step)
)
)
index = index[keep]
rho_coh = rho_coh[keep]
snrv = {ifo: snrv[ifo][keep] for ifo in snrv}
rho_coinc = rho_coinc[keep]
null = null[keep]
return null, rho_coh, rho_coinc, index, snrv
[docs]
def reweight_snr_by_null(
network_snr, null, coherent, null_min=5.25, null_grad=0.2,
null_step=20.0):
"""Re-weight the detection statistic as a function of the null SNR.
See Eq. 16 of Williamson et al. (2014) [arXiv:1410.6042].
Parameters
----------
network_snr: numpy.ndarray
Array containing SNR statistic to be re-weighted
null: numpy.ndarray
Null SNR array
coherent:
Coherent SNR array
Returns
-------
rw_snr: numpy.ndarray
Re-weighted SNR for each trigger
"""
downweight = (
((null > null_min - 1) & (coherent <= null_step))
| (
(null > (coherent * null_grad + null_min - 1))
& (coherent > null_step)
)
)
rw_fac = np.where(
coherent > null_step,
1 + null - (null_min - 1) - (coherent - null_step) * null_grad,
1 + null - (null_min - 1)
)
rw_snr = np.where(downweight, network_snr / rw_fac, network_snr)
return rw_snr
[docs]
def reweightedsnr_cut(rw_snr, rw_snr_threshold):
"""
Performs a cut on reweighted snr based on a given threshold
Parameters
----------
rw_snr: array of reweighted snr
rw_snr_threshhold: any reweighted snr below this threshold is set to 0
Returns
-------
rw_snr: array of reweighted snr with cut values as 0
"""
if rw_snr_threshold is not None:
rw_snr = np.where(rw_snr < rw_snr_threshold, 0, rw_snr)
return rw_snr