Source code for pycbc.events.stat

# Copyright (C) 2016 Alex Nitz
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

#
# =============================================================================
#
#                                   Preamble
#
# =============================================================================
#
"""
This module contains functions for calculating coincident ranking statistic
values.
"""
import logging
from hashlib import sha1
from datetime import datetime as dt
import numpy
import h5py

from . import ranking
from . import coinc_rate
from .eventmgr_cython import logsignalrateinternals_computepsignalbins
from .eventmgr_cython import logsignalrateinternals_compute2detrate

logger = logging.getLogger("pycbc.events.stat")

_allowed_statistic_features = [
    "phasetd",
    "kde",
    "dq",
    "chirp_mass",
    "sensitive_volume",
    "normalize_fit_rate",
]


[docs] class Stat(object): """Base class which should be extended to provide a statistic""" def __init__(self, sngl_ranking, files=None, ifos=None, **kwargs): """ Create a statistic class instance Parameters ---------- sngl_ranking: str The name of the ranking to use for the single-detector triggers. files: list of strs, needed for some statistics A list containing the filenames of hdf format files used to help construct the coincident statistics. The files must have a 'stat' attribute which is used to associate them with the appropriate statistic class. ifos: list of strs, needed for some statistics The list of detector names """ self.files = {} files = files or [] for filename in files: with h5py.File(filename, "r") as f: stat = f.attrs["stat"] if hasattr(stat, "decode"): stat = stat.decode() if stat in self.files: raise RuntimeError( "We already have one file with stat attr = " "%s. Can't provide more than one!" % stat ) logger.info("Found file %s for stat %s", filename, stat) self.files[stat] = filename # Keep track of when stat files hashes so it can be # reloaded if it has changed self.file_hashes = self.get_file_hashes() # Provide the dtype of the single detector method's output # This is used by background estimation codes that need to maintain # a buffer of such values. self.single_dtype = numpy.float32 # True if a larger single detector statistic will produce a larger # coincident statistic self.single_increasing = True self.ifos = ifos or [] self.sngl_ranking = sngl_ranking self.sngl_ranking_kwargs = {} self.kwargs = {} for key, value in kwargs.items(): if key.startswith("sngl_ranking_"): # Note that all the sngl_ranking keywords are floats, # so this conversion is safe - if that changes in the future, # we may need something more sophisticated self.sngl_ranking_kwargs[key[13:]] = float(value) else: self.kwargs[key] = value
[docs] def get_file_hashes(self): """ Get sha1 hashes for all the files """ logger.debug( "Getting file hashes" ) start = dt.now() file_hashes = {} for stat, filename in self.files.items(): with open(filename, 'rb') as file_binary: file_hashes[stat] = sha1(file_binary.read()).hexdigest() logger.debug( "Got file hashes for %d files, took %.3es", len(self.files), (dt.now() - start).total_seconds() ) return file_hashes
[docs] def files_changed(self): """ Compare hashes of files now with the ones we have cached """ changed_file_hashes = self.get_file_hashes() for stat, old_hash in self.file_hashes.items(): if changed_file_hashes[stat] != old_hash: logger.info( "%s statistic file %s has changed", ''.join(self.ifos), stat, ) else: # Remove the dataset from the dictionary of hashes del changed_file_hashes[stat] if changed_file_hashes == {}: logger.debug( "No %s statistic files have changed", ''.join(self.ifos) ) return list(changed_file_hashes.keys())
[docs] def check_update_files(self): """ Check whether files associated with the statistic need updated, then do so for each file which needs changing """ files_changed = self.files_changed() for file_key in files_changed: self.update_file(file_key) self.file_hashes = self.get_file_hashes()
[docs] def update_file(self, key): """ Update file used in this statistic referenced by key. """ err_msg = "This function is a stub that should be overridden by the " err_msg += "sub-classes. You shouldn't be seeing this error!" raise NotImplementedError(err_msg)
[docs] def get_sngl_ranking(self, trigs): """ Returns the ranking for the single detector triggers. Parameters ---------- trigs: dict of numpy.ndarrays, h5py group or similar dict-like object Object holding single detector trigger information. Returns ------- numpy.ndarray The array of single detector values """ return ranking.get_sngls_ranking_from_trigs( trigs, self.sngl_ranking, **self.sngl_ranking_kwargs )
[docs] def single(self, trigs): # pylint:disable=unused-argument """ Calculate the necessary single detector information Parameters ---------- trigs: dict of numpy.ndarrays, h5py group or similar dict-like object Object holding single detector trigger information. Returns ------- numpy.ndarray The array of single detector values """ err_msg = "This function is a stub that should be overridden by the " err_msg += "sub-classes. You shouldn't be seeing this error!" raise NotImplementedError(err_msg)
[docs] def rank_stat_single(self, single_info): # pylint:disable=unused-argument """ Calculate the statistic for a single detector candidate Parameters ---------- single_info: tuple Tuple containing two values. The first is the ifo (str) and the second is the single detector triggers. Returns ------- numpy.ndarray The array of single detector statistics """ err_msg = "This function is a stub that should be overridden by the " err_msg += "sub-classes. You shouldn't be seeing this error!" raise NotImplementedError(err_msg)
[docs] def rank_stat_coinc( self, s, slide, step, to_shift, **kwargs ): # pylint:disable=unused-argument """ Calculate the coincident detection statistic. """ err_msg = "This function is a stub that should be overridden by the " err_msg += "sub-classes. You shouldn't be seeing this error!" raise NotImplementedError(err_msg)
def _check_coinc_lim_subclass(self, allowed_names): """ Check that we are not using coinc_lim_for_thresh when not valid. coinc_lim_for_thresh is only defined for the statistic it is present in. If we subclass, we must check explicitly that it is still valid and indicate this in the code. If the code does not have this explicit check you will see the failure message here. Parameters ----------- allowed_names : list list of allowed classes for the specific sub-classed method. """ if type(self).__name__ not in allowed_names: err_msg = "This is being called from a subclass which has not " err_msg += "been checked for validity with this method. If it is " err_msg += "valid for the subclass to come here, include in the " err_msg += "list of allowed_names above." raise NotImplementedError(err_msg)
[docs] def coinc_lim_for_thresh( self, s, thresh, limifo, **kwargs ): # pylint:disable=unused-argument """ Optimization function to identify coincs too quiet to be of interest Calculate the required single detector statistic to exceed the threshold for each of the input triggers. """ err_msg = "This function is a stub that should be overridden by the " err_msg += "sub-classes. You shouldn't be seeing this error!" raise NotImplementedError(err_msg)
[docs] class QuadratureSumStatistic(Stat): """Calculate the quadrature sum coincident detection statistic"""
[docs] def single(self, trigs): """ Calculate the necessary single detector information Here just the ranking is computed and returned. Parameters ---------- trigs: dict of numpy.ndarrays, h5py group or similar dict-like object Object holding single detector trigger information. Returns ------- numpy.ndarray The array of single detector values """ return self.get_sngl_ranking(trigs)
[docs] def rank_stat_single(self, single_info): """ Calculate the statistic for a single detector candidate For this statistic this is just passing through the single value, which will be the second entry in the tuple. Parameters ---------- single_info: tuple Tuple containing two values. The first is the ifo (str) and the second is the single detector triggers. Returns ------- numpy.ndarray The array of single detector statistics """ return single_info[1]
[docs] def rank_stat_coinc( self, sngls_list, slide, step, to_shift, **kwargs ): # pylint:disable=unused-argument """ Calculate the coincident detection statistic. Parameters ---------- sngls_list: list List of (ifo, single detector statistic) tuples slide: (unused in this statistic) step: (unused in this statistic) to_shift: list List of integers indicating what multiples of the time shift will be applied (unused in this statistic) Returns ------- numpy.ndarray Array of coincident ranking statistic values """ cstat = sum(sngl[1] ** 2. for sngl in sngls_list) ** 0.5 # For single-detector "cuts" the single ranking is set to -1 for sngls in sngls_list: cstat[sngls == -1] = 0 return cstat
[docs] def coinc_lim_for_thresh( self, s, thresh, limifo, **kwargs ): # pylint:disable=unused-argument """ Optimization function to identify coincs too quiet to be of interest Calculate the required single detector statistic to exceed the threshold for each of the input triggers. Parameters ---------- s: list List of (ifo, single detector statistic) tuples for all detectors except limifo. thresh: float The threshold on the coincident statistic. limifo: string The ifo for which the limit is to be found. Returns ------- numpy.ndarray Array of limits on the limifo single statistic to exceed thresh. """ # Safety against subclassing and not rethinking this allowed_names = ["QuadratureSumStatistic"] self._check_coinc_lim_subclass(allowed_names) s0 = thresh ** 2. - sum(sngl[1] ** 2. for sngl in s) s0[s0 < 0] = 0 return s0 ** 0.5
[docs] class PhaseTDStatistic(QuadratureSumStatistic): """ Statistic that re-weights combined newsnr using coinc parameters. The weighting is based on the PDF of time delays, phase differences and amplitude ratios between triggers in different ifos. """ def __init__( self, sngl_ranking, files=None, ifos=None, pregenerate_hist=True, **kwargs, ): """ Parameters ---------- sngl_ranking: str The name of the ranking to use for the single-detector triggers. files: list of strs, unused here A list containing the filenames of hdf format files used to help construct the coincident statistics. The files must have a 'stat' attribute which is used to associate them with the appropriate statistic class. ifos: list of strs, needed here The list of detector names pregenerate_hist: bool, optional If False, do not pregenerate histogram on class instantiation. Default is True. """ QuadratureSumStatistic.__init__( self, sngl_ranking, files=files, ifos=ifos, **kwargs ) self.single_dtype = [ ("snglstat", numpy.float32), ("coa_phase", numpy.float32), ("end_time", numpy.float64), ("sigmasq", numpy.float32), ("snr", numpy.float32), ] # Assign attribute so that it can be replaced with other functions self.has_hist = False self.hist_ifos = None self.ref_snr = 5. self.relsense = {} self.swidth = self.pwidth = self.twidth = None self.srbmin = self.srbmax = None self.max_penalty = None self.pdtype = [] self.weights = {} self.param_bin = {} self.two_det_flag = len(ifos) == 2 self.two_det_weights = {} # Some memory self.pdif = numpy.zeros(128, dtype=numpy.float64) self.tdif = numpy.zeros(128, dtype=numpy.float64) self.sdif = numpy.zeros(128, dtype=numpy.float64) self.tbin = numpy.zeros(128, dtype=numpy.int32) self.pbin = numpy.zeros(128, dtype=numpy.int32) self.sbin = numpy.zeros(128, dtype=numpy.int32) # Is the histogram needed to be pre-generated? hist_needed = pregenerate_hist hist_needed &= not len(ifos) == 1 hist_needed &= (type(self).__name__ == "PhaseTD" or self.kwargs["phasetd"]) if hist_needed: self.get_hist() elif len(ifos) == 1: # remove all phasetd files from self.files and self.file_hashes, # as they are not needed for k in list(self.files.keys()): if 'phasetd_newsnr' in k: del self.files[k] del self.file_hashes[k]
[docs] def get_hist(self, ifos=None): """ Read in a signal density file for the ifo combination Parameters ---------- ifos: list The list of ifos. Needed if not given when initializing the class instance. """ ifos = ifos or self.ifos selected = None for name in self.files: # Pick out the statistic files that provide phase / time/ amp # relationships and match to the ifos in use if "phasetd_newsnr" in name: ifokey = name.split("_")[2] num = len(ifokey) / 2 if num != len(ifos): continue match = [ifo in ifokey for ifo in ifos] if False in match: continue selected = name break # If there are other phasetd_newsnr files, they aren't needed. # So tidy them out of the self.files dictionary rejected = [key for key in self.files.keys() if 'phasetd_newsnr' in key and not key == selected] for k in rejected: del self.files[k] del self.file_hashes[k] if selected is None and len(ifos) > 1: raise RuntimeError("Couldn't figure out which stat file to use") if len(ifos) == 1: # We dont need the histogram file, but we are trying to get one # just skip it in this case return logger.info("Using signal histogram %s for ifos %s", selected, ifos) weights = {} param = {} with h5py.File(self.files[selected], "r") as histfile: self.hist_ifos = histfile.attrs["ifos"] # Patch for pre-hdf5=3.0 histogram files try: logger.info("Decoding hist ifos ..") self.hist_ifos = [i.decode("UTF-8") for i in self.hist_ifos] except (UnicodeDecodeError, AttributeError): pass # Histogram bin attributes self.twidth = histfile.attrs["twidth"] self.pwidth = histfile.attrs["pwidth"] self.swidth = histfile.attrs["swidth"] self.srbmin = histfile.attrs["srbmin"] self.srbmax = histfile.attrs["srbmax"] relfac = histfile.attrs["sensitivity_ratios"] for ifo in self.hist_ifos: weights[ifo] = histfile[ifo]["weights"][:] param[ifo] = histfile[ifo]["param_bin"][:] n_ifos = len(self.hist_ifos) bin_volume = (self.twidth * self.pwidth * self.swidth) ** (n_ifos - 1) self.hist_max = -1. * numpy.inf # Read histogram for each ifo, to use if that ifo has smallest SNR in # the coinc for ifo in self.hist_ifos: # renormalise to PDF self.weights[ifo] = \ (weights[ifo] / (weights[ifo].sum() * bin_volume)) self.weights[ifo] = self.weights[ifo].astype(numpy.float32) if param[ifo].dtype == numpy.int8: # Older style, incorrectly sorted histogram file ncol = param[ifo].shape[1] self.pdtype = [ ("c%s" % i, param[ifo].dtype) for i in range(ncol) ] self.param_bin[ifo] = numpy.zeros( len(self.weights[ifo]), dtype=self.pdtype ) for i in range(ncol): self.param_bin[ifo]["c%s" % i] = param[ifo][:, i] lsort = self.param_bin[ifo].argsort() self.param_bin[ifo] = self.param_bin[ifo][lsort] self.weights[ifo] = self.weights[ifo][lsort] else: # New style, efficient histogram file # param bin and weights have already been sorted self.param_bin[ifo] = param[ifo] self.pdtype = self.param_bin[ifo].dtype # Max_penalty is a small number to assigned to any bins without # histogram entries. All histograms in a given file have the same # min entry by design, so use the min of the last one read in. self.max_penalty = self.weights[ifo].min() self.hist_max = max(self.hist_max, self.weights[ifo].max()) if self.two_det_flag: # The density of signals is computed as a function of 3 binned # parameters: time difference (t), phase difference (p) and # SNR ratio (s). These are computed for each combination of # detectors, so for detectors 6 differences are needed. However # many combinations of these parameters are highly unlikely and # no instances of these combinations occurred when generating # the statistic files. Rather than storing a bunch of 0s, these # values are just not stored at all. This reduces the size of # the statistic file, but means we have to identify the correct # value to read for every trigger. For 2 detectors we can # expand the weights lookup table here, basically adding in all # the "0" values. This makes looking up a value in the # "weights" table a O(N) rather than O(NlogN) operation. It # sacrifices RAM to do this, so is a good tradeoff for 2 # detectors, but not for 3! if not hasattr(self, "c0_size"): self.c0_size = {} self.c1_size = {} self.c2_size = {} self.c0_size[ifo] = numpy.int32( 2 * (abs(self.param_bin[ifo]["c0"]).max() + 1) ) self.c1_size[ifo] = numpy.int32( 2 * (abs(self.param_bin[ifo]["c1"]).max() + 1) ) self.c2_size[ifo] = numpy.int32( 2 * (abs(self.param_bin[ifo]["c2"]).max() + 1) ) array_size = [ self.c0_size[ifo], self.c1_size[ifo], self.c2_size[ifo], ] dtypec = self.weights[ifo].dtype self.two_det_weights[ifo] = ( numpy.zeros(array_size, dtype=dtypec) + self.max_penalty ) id0 = ( self.param_bin[ifo]["c0"].astype(numpy.int32) + self.c0_size[ifo] // 2 ) id1 = ( self.param_bin[ifo]["c1"].astype(numpy.int32) + self.c1_size[ifo] // 2 ) id2 = ( self.param_bin[ifo]["c2"].astype(numpy.int32) + self.c2_size[ifo] // 2 ) self.two_det_weights[ifo][id0, id1, id2] = self.weights[ifo] for ifo, sense in zip(self.hist_ifos, relfac): self.relsense[ifo] = sense self.has_hist = True
[docs] def update_file(self, key): """ Update file used in this statistic. If others are used (i.e. this statistic is inherited), they will need updated separately """ if 'phasetd_newsnr' in key and not len(self.ifos) == 1: if ''.join(sorted(self.ifos)) not in key: logger.debug( "%s file is not used for %s statistic", key, ''.join(self.ifos) ) return False logger.info( "Updating %s statistic %s file", ''.join(self.ifos), key ) # This is a PhaseTDStatistic file which needs updating self.get_hist() return True return False
[docs] def logsignalrate(self, stats, shift, to_shift): """ Calculate the normalized log rate density of coinc signals via lookup Parameters ---------- stats: dict of dicts Single-detector quantities for each detector shift: numpy array of float Time shift vector for each coinc to be ranked to_shift: list of ints Multiple of the time shift to apply, ordered as self.ifos Returns ------- value: log of coinc signal rate density for the given single-ifo triggers and time shifts """ # Convert time shift vector to dict, as hist ifos and self.ifos may # not be in same order to_shift = {ifo: s for ifo, s in zip(self.ifos, to_shift)} if not self.has_hist: self.get_hist() # Figure out which ifo of the contributing ifos has the smallest SNR, # to use as reference for choosing the signal histogram. snrs = numpy.array( [numpy.array(stats[ifo]["snr"], ndmin=1) for ifo in self.ifos] ) smin = snrs.argmin(axis=0) # Store a list of the triggers using each ifo as reference rtypes = { ifo: numpy.where(smin == j)[0] for j, ifo in enumerate(self.ifos) } # Get reference ifo information rate = numpy.zeros(len(shift), dtype=numpy.float32) ps = {ifo: numpy.array(stats[ifo]['coa_phase'], dtype=numpy.float32, ndmin=1) for ifo in self.ifos} ts = {ifo: numpy.array(stats[ifo]['end_time'], dtype=numpy.float64, ndmin=1) for ifo in self.ifos} ss = {ifo: numpy.array(stats[ifo]['snr'], dtype=numpy.float32, ndmin=1) for ifo in self.ifos} sigs = {ifo: numpy.array(stats[ifo]['sigmasq'], dtype=numpy.float32, ndmin=1) for ifo in self.ifos} for ref_ifo in self.ifos: rtype = rtypes[ref_ifo] pref = ps[ref_ifo] tref = ts[ref_ifo] sref = ss[ref_ifo] sigref = sigs[ref_ifo] senseref = self.relsense[self.hist_ifos[0]] binned = [] other_ifos = [ifo for ifo in self.ifos if ifo != ref_ifo] for ifo in other_ifos: # Assign cached memory length = len(rtype) while length > len(self.pdif): newlen = len(self.pdif) * 2 self.pdif = numpy.zeros(newlen, dtype=numpy.float64) self.tdif = numpy.zeros(newlen, dtype=numpy.float64) self.sdif = numpy.zeros(newlen, dtype=numpy.float64) self.pbin = numpy.zeros(newlen, dtype=numpy.int32) self.tbin = numpy.zeros(newlen, dtype=numpy.int32) self.sbin = numpy.zeros(newlen, dtype=numpy.int32) # Calculate differences logsignalrateinternals_computepsignalbins( self.pdif, self.tdif, self.sdif, self.pbin, self.tbin, self.sbin, ps[ifo], ts[ifo], ss[ifo], sigs[ifo], pref, tref, sref, sigref, shift, rtype, self.relsense[ifo], senseref, self.twidth, self.pwidth, self.swidth, to_shift[ref_ifo], to_shift[ifo], length, ) binned += [ self.tbin[:length], self.pbin[:length], self.sbin[:length], ] # Read signal weight from precalculated histogram if self.two_det_flag: # High-RAM, low-CPU option for two-det logsignalrateinternals_compute2detrate( binned[0], binned[1], binned[2], self.c0_size[ref_ifo], self.c1_size[ref_ifo], self.c2_size[ref_ifo], rate, rtype, sref, self.two_det_weights[ref_ifo], self.max_penalty, self.ref_snr, len(rtype), ) else: # Low[er]-RAM, high[er]-CPU option for >two det # Convert binned to same dtype as stored in hist nbinned = numpy.zeros(len(binned[1]), dtype=self.pdtype) for i, b in enumerate(binned): nbinned[f"c{i}"] = b loc = numpy.searchsorted(self.param_bin[ref_ifo], nbinned) loc[loc == len(self.weights[ref_ifo])] = 0 rate[rtype] = self.weights[ref_ifo][loc] # These weren't in our histogram so give them max penalty # instead of random value missed = numpy.where( self.param_bin[ref_ifo][loc] != nbinned )[0] rate[rtype[missed]] = self.max_penalty # Scale by signal population SNR rate[rtype] *= (sref[rtype] / self.ref_snr) ** -4. return numpy.log(rate)
[docs] def single(self, trigs): """ Calculate the necessary single detector information Here the ranking as well as phase, endtime and sigma-squared values. Parameters ---------- trigs: dict of numpy.ndarrays, h5py group or similar dict-like object Object holding single detector trigger information. 'snr', 'chisq', 'chisq_dof', 'coa_phase', 'end_time', and 'sigmasq' are required keys. Returns ------- numpy.ndarray Array of single detector parameter values """ sngl_stat = self.get_sngl_ranking(trigs) singles = numpy.zeros(len(sngl_stat), dtype=self.single_dtype) singles["snglstat"] = sngl_stat singles["coa_phase"] = trigs["coa_phase"][:] singles["end_time"] = trigs["end_time"][:] singles["sigmasq"] = trigs["sigmasq"][:] singles["snr"] = trigs["snr"][:] return numpy.array(singles, ndmin=1)
[docs] def rank_stat_single(self, single_info): """ Calculate the statistic for a single detector candidate For this statistic this is just passing through the single value, which will be the second entry in the tuple. Parameters ---------- single_info: tuple Tuple containing two values. The first is the ifo (str) and the second is the single detector triggers. Returns ------- numpy.ndarray The array of single detector statistics """ return single_info[1]["snglstat"]
[docs] def rank_stat_coinc( self, sngls_list, slide, step, to_shift, **kwargs ): # pylint:disable=unused-argument """ Calculate the coincident detection statistic, defined in Eq 2 of [Nitz et al, 2017](https://doi.org/10.3847/1538-4357/aa8f50). """ rstat = sum(s[1]["snglstat"] ** 2 for s in sngls_list) cstat = rstat + 2. * self.logsignalrate( dict(sngls_list), slide * step, to_shift ) cstat[cstat < 0] = 0 return cstat**0.5
[docs] def coinc_lim_for_thresh( self, sngls_list, thresh, limifo, **kwargs ): # pylint:disable=unused-argument """ Optimization function to identify coincs too quiet to be of interest. Calculate the required single detector statistic to exceed the threshold for each of the input triggers. """ # Safety against subclassing and not rethinking this allowed_names = ["PhaseTDStatistic"] self._check_coinc_lim_subclass(allowed_names) if not self.has_hist: self.get_hist() fixed_stat_sq = sum( [b["snglstat"] ** 2 for a, b in sngls_list if a != limifo] ) s1 = thresh ** 2. - fixed_stat_sq # Assume best case scenario and use maximum signal rate s1 -= 2. * self.hist_max s1[s1 < 0] = 0 return s1**0.5
[docs] class ExpFitStatistic(PhaseTDStatistic): """ Detection statistic using an exponential falloff noise model. Statistic approximates the negative log noise coinc rate density per template over single-ifo newsnr values. Extra features can be added by supplying keyword arguments when initialising. """ def __init__(self, sngl_ranking, files=None, ifos=None, **kwargs): """ Parameters ---------- sngl_ranking: str The name of the ranking to use for the single-detector triggers. files: list of strs, needed here A list containing the filenames of hdf format files used to help construct the coincident statistics. The files must have a 'stat' attribute which is used to associate them with the appropriate statistic class. ifos: list of strs, not used here The list of detector names kwargs: values and features needed for the statistic """ if not files: raise RuntimeError("Files not specified") PhaseTDStatistic.__init__( self, sngl_ranking, files=files, ifos=ifos, **kwargs ) # Get the single-detector rates fit files # the stat file attributes are hard-coded as '%{ifo}-fit_coeffs' parsed_attrs = [f.split("-") for f in self.files.keys()] self.bg_ifos = [ at[0] for at in parsed_attrs if (len(at) == 2 and at[1] == "fit_coeffs") ] if not len(self.bg_ifos): raise RuntimeError( "None of the statistic files has the required " "attribute called {ifo}-fit_coeffs !" ) # Get the single-detector rates fit values self.fits_by_tid = {} for i in self.bg_ifos: self.fits_by_tid[i] = self.assign_fits(i) if self.kwargs["normalize_fit_rate"]: self.reassign_rate(i) # These are important for the coinc_lim_for_thresh method # Is the single threshold a < or > limit? self.single_increasing = False # Things for calculating the best-case scenario for signal rate self.max_sigmasq = -numpy.inf self.min_snr = numpy.inf # Some modifiers for the statistic to get it into a nice range self.benchmark_lograte = float( self.kwargs.get("benchmark_lograte", -14.6) ) self.min_stat = float( self.kwargs.get("minimum_statistic_cutoff", -30.) ) # Modifier to get a sensible value of the fit slope below threshold self.alphabelow = float( self.kwargs.get("alpha_below_thresh", numpy.inf) ) # This will be used to keep track of the template number being used self.curr_tnum = None # Applies a constant offset to all statistic values in a given instance. # This can be used to e.g. change relative rankings between different # event types. Default is zero offset. self.stat_correction = float( self.kwargs.get("statistic_correction", 0) ) # Go through the keywords and add class information as needed: if self.kwargs["sensitive_volume"]: # Add network sensitivity benchmark self.single_dtype.append(("benchmark_logvol", numpy.float32)) # benchmark_logvol is a benchmark sensitivity array # over template id ref_ifos = self.kwargs.get("reference_ifos", "H1,L1").split(",") hl_net_med_sigma = numpy.nanmin( [self.fits_by_tid[ifo]["median_sigma"] for ifo in ref_ifos], axis=0, ) self.benchmark_logvol = 3. * numpy.log(hl_net_med_sigma) if self.kwargs["dq"]: # Reweight the noise rate by the dq reweighting factor self.dq_rates_by_state = {} self.dq_bin_by_tid = {} self.dq_state_segments = None self.low_latency = False self.single_dtype.append(('dq_state', int)) for ifo in self.ifos: key = f"{ifo}-dq_stat_info" if key in self.files.keys(): self.dq_rates_by_state[ifo] = self.assign_dq_rates(key) self.dq_bin_by_tid[ifo] = self.assign_template_bins(key) self.check_low_latency(key) if not self.low_latency: if self.dq_state_segments is None: self.dq_state_segments = {} self.dq_state_segments[ifo] = self.setup_segments(key) if self.kwargs["chirp_mass"]: # Reweight the signal rate by the chirp mass of the template # This may be stored as a float, so cast just in case self.mcm = float(self.kwargs.get("max_chirp_mass", numpy.inf)) self.curr_mchirp = None if self.kwargs["kde"]: # Reweight the signal rate by a weighting factor from the KDE of # a signal population normalised by expected relative rate of noise # triggers for a template self.kde_names = [] self.find_kdes() self.kde_by_tid = {} for kname in self.kde_names: self.assign_kdes(kname)
[docs] def assign_template_bins(self, key): """ Assign bin ID values Assign each template id to a bin name based on a referenced statistic file. Parameters ---------- key: str statistic file key string Returns --------- bin_dict: dict of strs Dictionary containing the bin name for each template id """ ifo = key.split("-")[0] with h5py.File(self.files[key], "r") as dq_file: tids = [] bin_nums = [] bin_grp = dq_file[f"{ifo}/bins"] for bin_name in bin_grp.keys(): bin_tids = bin_grp[f"{bin_name}/tids"][:] tids = list(tids) + list(bin_tids.astype(int)) bin_nums = list(bin_nums) + list([bin_name] * len(bin_tids)) bin_dict = dict(zip(tids, bin_nums)) return bin_dict
[docs] def assign_dq_rates(self, key): """ Assign dq values to each time for every bin based on a referenced statistic file. Parameters ---------- key: str statistic file key string Returns --------- dq_dict: dict of {time: dq_value} dicts for each bin Dictionary containing the mapping between the time and the dq value for each individual bin. """ ifo = key.split("-")[0] with h5py.File(self.files[key], "r") as dq_file: bin_grp = dq_file[f"{ifo}/bins"] dq_dict = {} for bin_name in bin_grp.keys(): dq_dict[bin_name] = bin_grp[f"{bin_name}/dq_rates"][:] return dq_dict
[docs] def setup_segments(self, key): """ Store segments from stat file """ ifo = key.split("-")[0] with h5py.File(self.files[key], "r") as dq_file: ifo_grp = dq_file[ifo] dq_state_segs_dict = {} for k in ifo_grp["dq_segments"].keys(): seg_dict = {} seg_dict["start"] = \ ifo_grp[f"dq_segments/{k}/segment_starts"][:] seg_dict["end"] = ifo_grp[f"dq_segments/{k}/segment_ends"][:] dq_state_segs_dict[k] = seg_dict return dq_state_segs_dict
[docs] def find_dq_noise_rate(self, trigs, dq_state): """Get dq values for a specific ifo and dq states""" dq_val = numpy.ones(len(dq_state)) if self.curr_ifo in self.dq_rates_by_state: for i, st in enumerate(dq_state): if isinstance(self.curr_tnum, numpy.ndarray): bin_name = self.dq_bin_by_tid[self.curr_ifo][self.curr_tnum[i]] else: bin_name = self.dq_bin_by_tid[self.curr_ifo][self.curr_tnum] dq_val[i] = self.dq_rates_by_state[self.curr_ifo][bin_name][st] return dq_val
[docs] def find_dq_state_by_time(self, ifo, times): """Get the dq state for an ifo at times""" dq_state = numpy.zeros(len(times), dtype=numpy.uint8) if ifo in self.dq_state_segments: from pycbc.events.veto import indices_within_times for k in self.dq_state_segments[ifo]: starts = self.dq_state_segments[ifo][k]["start"] ends = self.dq_state_segments[ifo][k]["end"] inds = indices_within_times(times, starts, ends) # states are named in file as 'dq_state_N', need to extract N dq_state[inds] = int(k[9:]) return dq_state
[docs] def check_low_latency(self, key): """ Check if the statistic file indicates low latency mode. Parameters ---------- key: str Statistic file key string. Returns ------- None """ ifo = key.split('-')[0] with h5py.File(self.files[key], 'r') as dq_file: ifo_grp = dq_file[ifo] if 'dq_segments' not in ifo_grp.keys(): # if segs are not in file, we must be in LL if self.dq_state_segments is not None: raise ValueError( 'Either all dq stat files must have segments or none' ) self.low_latency = True elif self.low_latency: raise ValueError( 'Either all dq stat files must have segments or none' )
[docs] def reassign_rate(self, ifo): """ Reassign the rate to be number per time rather than an arbitrarily normalised number. Parameters ----------- ifo: str The ifo to consider. """ with h5py.File(self.files[f'{ifo}-fit_coeffs'], 'r') as coeff_file: analysis_time = float(coeff_file.attrs['analysis_time']) fbt = 'fit_by_template' in coeff_file self.fits_by_tid[ifo]['smoothed_rate_above_thresh'] /= analysis_time self.fits_by_tid[ifo]['smoothed_rate_in_template'] /= analysis_time # The by-template fits may have been stored in the smoothed fits file if fbt: self.fits_by_tid[ifo]['fit_by_rate_above_thresh'] /= analysis_time self.fits_by_tid[ifo]['fit_by_rate_in_template'] /= analysis_time
[docs] def assign_fits(self, ifo): """ Extract fits from single-detector rate fit files Parameters ----------- ifo: str The detector to get fits for. Returns ------- rate_dict: dict A dictionary containing the fit information in the `alpha`, `rate` and `thresh` keys. """ coeff_file = h5py.File(self.files[f"{ifo}-fit_coeffs"], "r") template_id = coeff_file["template_id"][:] # the template_ids and fit coeffs are stored in an arbitrary order # create new arrays in template_id order for easier recall tid_sort = numpy.argsort(template_id) fits_by_tid_dict = {} fits_by_tid_dict["smoothed_fit_coeff"] = coeff_file["fit_coeff"][:][ tid_sort ] fits_by_tid_dict["smoothed_rate_above_thresh"] = coeff_file[ "count_above_thresh" ][:][tid_sort].astype(float) fits_by_tid_dict["smoothed_rate_in_template"] = coeff_file[ "count_in_template" ][:][tid_sort].astype(float) if self.kwargs["sensitive_volume"]: fits_by_tid_dict["median_sigma"] = coeff_file["median_sigma"][:][ tid_sort ].astype(float) # The by-template fits may have been stored in the smoothed fits file if "fit_by_template" in coeff_file: coeff_fbt = coeff_file["fit_by_template"] fits_by_tid_dict["fit_by_fit_coeff"] = coeff_fbt["fit_coeff"][:][ tid_sort ] fits_by_tid_dict["fit_by_rate_above_thresh"] = coeff_fbt[ "count_above_thresh" ][:][tid_sort].astype(float) fits_by_tid_dict["fit_by_rate_in_template"] = coeff_file[ "count_in_template" ][:][tid_sort].astype(float) # Keep the fit threshold in fits_by_tid fits_by_tid_dict["thresh"] = coeff_file.attrs["stat_threshold"] coeff_file.close() return fits_by_tid_dict
[docs] def update_file(self, key): """ Update file used in this statistic. If others are used (i.e. this statistic is inherited), they will need updated separately """ # First - check if the phasetd file has been updated, this is # inherited from the PhaseTDStatistic if PhaseTDStatistic.update_file(self, key): return True if key.endswith('-fit_coeffs'): # This is a ExpFitStatistic file which needs updating # Which ifo is it? ifo = key[:2] self.fits_by_tid[ifo] = self.assign_fits(ifo) if self.kwargs["normalize_fit_rate"]: self.reassign_rate(ifo) self.get_ref_vals(ifo) logger.info( "Updating %s statistic %s file", ''.join(self.ifos), key ) return True # Is the key a KDE statistic file that we update here? if key.endswith('kde_file'): logger.info( "Updating %s statistic %s file", ''.join(self.ifos), key ) kde_style = key.split('-')[0] self.assign_kdes(kde_style) return True # We also need to check if the DQ files have updated if key.endswith('dq_stat_info'): ifo = key.split('-')[0] logger.info( "Updating %s statistic %s file", ifo, key ) self.dq_rates_by_state[ifo] = self.assign_dq_rates(key) self.dq_bin_by_tid[ifo] = self.assign_template_bins(key) return True return False
[docs] def get_ref_vals(self, ifo): """ Get the largest `alpha` value over all templates for given ifo. This is stored in `self.alphamax[ifo]` in the class instance. Parameters ----------- ifo: str The detector to get fits for. """ self.alphamax[ifo] = self.fits_by_tid[ifo]['smoothed_fit_coeff'].max()
[docs] def find_fits(self, trigs): """ Get fit coeffs for a specific ifo and template id(s) Parameters ---------- trigs: dict of numpy.ndarrays, h5py group or similar dict-like object Object holding single detector trigger information. The coincidence executable will always call this using a bunch of trigs from a single template, there template_num is stored as an attribute and we just return the single value for all templates. If multiple templates are in play we must return arrays. Returns -------- alphai: float or numpy array The alpha fit value(s) ratei: float or numpy array The rate fit value(s) thresh: float or numpy array The thresh fit value(s) """ try: ifo = trigs.ifo except AttributeError: ifo = trigs.get('ifo', None) if ifo is None: ifo = self.ifos[0] assert ifo in self.ifos # fits_by_tid is a dictionary of dictionaries of arrays # indexed by ifo / coefficient name / template_id alphai = self.fits_by_tid[ifo]["smoothed_fit_coeff"][self.curr_tnum] ratei = self.fits_by_tid[ifo]["smoothed_rate_above_thresh"][self.curr_tnum] thresh = self.fits_by_tid[ifo]["thresh"] return alphai, ratei, thresh
[docs] def find_kdes(self): """ Find which associated files are for the KDE reweighting """ # The stat file attributes are hard-coded as 'signal-kde_file' # and 'template-kde_file' parsed_attrs = [f.split("-") for f in self.files.keys()] self.kde_names = [ at[0] for at in parsed_attrs if (len(at) == 2 and at[1] == "kde_file") ] assert sorted(self.kde_names) == ["signal", "template"], ( "Two KDE stat files are required, they should have stat attr " "'signal-kde_file' and 'template-kde_file' respectively" )
[docs] def assign_kdes(self, kname): """ Extract values from KDE files Parameters ----------- kname: str Used to label the kde files. """ with h5py.File(self.files[kname + "-kde_file"], "r") as kde_file: self.kde_by_tid[kname + "_kdevals"] = kde_file["data_kde"][:]
[docs] def kde_ratio(self): """ Calculate the weighting factor according to the ratio of the signal and template KDE lookup tables """ signal_kde = self.kde_by_tid["signal_kdevals"][self.curr_tnum] template_kde = self.kde_by_tid["template_kdevals"][self.curr_tnum] return numpy.log(signal_kde / template_kde)
[docs] def lognoiserate(self, trigs): """ Calculate the log noise rate density over single-ifo ranking Read in single trigger information, make the sngl_stat and rescale by the fitted coefficients alpha and rate Parameters ----------- trigs: dict of numpy.ndarrays, h5py group or similar dict-like object Object holding single detector trigger information. Returns --------- lognoisel: numpy.array Array of log noise rate density for each input trigger. """ # What is the template number currently being used? try: # exists if accessed via coinc_findtrigs, this is a number self.curr_tnum = trigs.template_num except AttributeError: # exists for SingleDetTriggers & pycbc_live get_coinc, # this is a numpy array self.curr_tnum = trigs["template_id"] alphai, ratei, thresh = self.find_fits(trigs) sngl_stat = self.get_sngl_ranking(trigs) lognoisel = ( -alphai * (sngl_stat - thresh) + numpy.log(alphai) + numpy.log(ratei) ) if not numpy.isinf(self.alphabelow): # Above the threshold we use the usual fit coefficient (alphai) # below threshold use specified alphabelow bt = sngl_stat < thresh lognoiselbt = ( -self.alphabelow * (sngl_stat - thresh) + numpy.log(self.alphabelow) + numpy.log(ratei) ) lognoisel[bt] = lognoiselbt[bt] return numpy.array(lognoisel, ndmin=1, dtype=numpy.float32)
[docs] def single(self, trigs): """ Calculate the necessary single detector information In this case the ranking rescaled (see the lognoiserate method here). with the phase, end time, SNR values added in. Parameters ---------- trigs: dict of numpy.ndarrays, h5py group or similar dict-like object Object holding single detector trigger information. Returns ------- numpy.ndarray The array of single detector values """ try: self.curr_ifo = trigs.ifo except AttributeError: self.curr_ifo = trigs.get('ifo', None) if self.curr_ifo is None: self.curr_ifo = self.ifos[0] assert self.curr_ifo in self.ifos # Should be exactly one ifo provided assert len(numpy.unique(self.curr_ifo)) == 1 # single-ifo stat = log of noise rate sngl_stat = self.lognoiserate(trigs) # populate other fields to calculate phase/time/amp consistency singles = numpy.zeros(len(sngl_stat), dtype=self.single_dtype) singles["coa_phase"] = trigs["coa_phase"][:] singles["end_time"] = trigs["end_time"][:] singles["snr"] = trigs["snr"][:] # Save info about best-case scenario for use later if singles["snr"].size: self.min_snr = min(singles["snr"].min(), self.min_snr) if self.kwargs["sensitive_volume"]: # populate fields to allow sensitive volume factor calculation singles["sigmasq"] = trigs["sigmasq"][:] # Store benchmark log volume as single-ifo information since # the ranking methods do not have access to template id singles["benchmark_logvol"] = self.benchmark_logvol[self.curr_tnum] # Save info about the best-case scenario for use later: if singles["sigmasq"].size: max_sigsq = numpy.max(singles["sigmasq"]) self.max_sigmasq = max(max_sigsq, self.max_sigmasq) if self.kwargs["chirp_mass"]: from pycbc.conversions import mchirp_from_mass1_mass2 try: mass1 = trigs.param['mass1'] mass2 = trigs.param['mass2'] except AttributeError: mass1 = trigs['mass1'] mass2 = trigs['mass2'] self.curr_mchirp = mchirp_from_mass1_mass2(mass1, mass2) if self.kwargs["dq"]: if self.low_latency: # trigs should already have a dq state assigned singles['dq_state'] = trigs['dq_state'][:] else: singles['dq_state'] = self.find_dq_state_by_time( self.curr_ifo, trigs['end_time'][:] ) dq_rate = self.find_dq_noise_rate(trigs, singles['dq_state']) dq_rate = numpy.maximum(dq_rate, 1) sngl_stat += numpy.log(dq_rate) singles["snglstat"] = sngl_stat return numpy.array(singles, ndmin=1)
[docs] def sensitive_volume_factor(self, sngls): """ Determine the factor for different network sensitivities Assuming a homogeneous distribution of sources, the signal rate should be given by the volume of the sphere to which we are sensitive. Parameters ---------- sngls: tuple Single-detector information, tuples of the (ifo, sngl_object), where sngl_object is a ReadByTemplate object or similar. Returns ------- network_logvol: numpy.array Array of values for the network log-volume factor. This is the log of the cube of the sensitive distance (sigma), divided by a benchmark volume. """ # Get benchmark log volume as single-ifo information : # benchmark_logvol for a given template is not ifo-dependent, so # choose the first ifo for convenience benchmark_logvol = sngls[0][1]["benchmark_logvol"] # Benchmark log volume will be the same for all triggers, so if # any are nan, they are all nan if any(numpy.isnan(benchmark_logvol)): # This can be the case in pycbc live if there are no triggers # from this template in the trigger fits file. If so, assume # that sigma for the triggers being ranked is # representative of the benchmark network. return 0 # Network sensitivity for a given coinc type is approximately # determined by the least sensitive ifo network_sigmasq = numpy.amin( [sngl[1]["sigmasq"] for sngl in sngls], axis=0 ) # Volume \propto sigma^3 or sigmasq^1.5 network_logvol = 1.5 * numpy.log(network_sigmasq) - benchmark_logvol return network_logvol
[docs] def logsignalrate_shared(self, sngls_info): """ Calculate the parts of the log signal rate which are shared by both the single and coinc ranking statistics Parameters ---------- sngls_info: tuple Single-detector information, tuples of the (ifo, sngl_object), where sngl_object is a ReadByTemplate object or similar. Returns ------- sr_factor: numpy.ndarray Array of values to be added to the ranking statistic when taking various signal rate factors into account """ # Other features affecting the signal rate sr_factor = 0 if self.kwargs["sensitive_volume"]: # Sensitive volume - this is proportional to signal rate # assuming a homogeneous universe sr_factor += self.sensitive_volume_factor(sngls_info) if self.kwargs["chirp_mass"]: # chirp mass reweighting if isinstance(self.curr_mchirp, numpy.ndarray): mchirp = numpy.minimum(self.curr_mchirp, self.mcm) else: # curr_mchirp will be a number mchirp = min(self.curr_mchirp, self.mcm) sr_factor += numpy.log((mchirp / 20.) ** (11. / 3.)) if self.kwargs["kde"]: # KDE reweighting sr_factor += self.kde_ratio() return sr_factor
[docs] def rank_stat_single(self, single_info): """ Calculate the statistic for single detector candidates Parameters ---------- single_info: tuple Tuple containing two values. The first is the ifo (str) and the second is the single detector triggers. Returns ------- numpy.ndarray The array of single detector statistics """ sngls = single_info[1] # Basic noise rate: the exp fit rate from the single statistic ln_noise_rate = sngls["snglstat"] ln_noise_rate -= self.benchmark_lograte # Basic signal rate: snr ** -4 ln_s = -4 * numpy.log(sngls["snr"] / self.ref_snr) # Add in the feature-dependent terms to the signal rate ln_s += self.logsignalrate_shared((single_info,)) # Combine the signal and noise rates loglr = ln_s - ln_noise_rate # Apply statistic correction loglr += self.stat_correction # cut off underflowing and very small values loglr[loglr < self.min_stat] = self.min_stat return loglr
[docs] def rank_stat_coinc( self, s, slide, step, to_shift, **kwargs ): # pylint:disable=unused-argument """ Calculate the coincident detection statistic. Parameters ---------- s: list List of (ifo, single detector statistic) tuples slide: numpy array The number of steps by which to shift each trigger step: float The size of each step, to be multiplied by to_shift to_shift: list List of integers indicating what multiples of the time shift will be applied kwargs: various Key-word arguments to define different features and tunable values in the statistic. Only time_addition is used here Returns ------- loglr: numpy.ndarray The ranking statistic for each trigger based on the supplied triggers, requested features and keyword arguments """ # ranking statistic is -ln(expected rate density of noise triggers) # plus normalization constant sngl_dict = {sngl[0]: sngl[1]["snglstat"] for sngl in s} # Basic noise rate: sum of noise rates multiplied by the # window they can form coincidences in ln_noise_rate = coinc_rate.combination_noise_lograte( sngl_dict, kwargs["time_addition"], dets=kwargs.get('dets', None), ) ln_noise_rate -= self.benchmark_lograte # Basic option is not to have any signal-based assumptions, # so this is zero to begin with ln_s = 0 if self.kwargs["phasetd"]: if not self.has_hist: self.get_hist() # Find total volume of phase-time-amplitude space occupied by # noise coincs, so that the logsignalrate function is properly # normalized # Extent of time-difference space occupied noise_twindow = coinc_rate.multiifo_noise_coincident_area( self.hist_ifos, kwargs["time_addition"], dets=kwargs.get('dets', None), ) # Volume is the allowed time difference window, multiplied by 2pi # for each phase difference dimension and by allowed range of SNR # ratio for each SNR ratio dimension : there are (n_ifos - 1) # dimensions for both phase and SNR n_ifos = len(self.hist_ifos) snr_range = (self.srbmax - self.srbmin) * self.swidth hist_vol = noise_twindow * (2. * numpy.pi * snr_range) ** ( n_ifos - 1 ) # Noise PDF is 1/volume, assuming a uniform distribution of noise # coincs ln_noise_rate -= numpy.log(hist_vol) # What is the signal pdf? stat = {ifo: st for ifo, st in s} ln_s += self.logsignalrate(stat, slide * step, to_shift) # Add in the feature-dependent terms to the signal rate ln_s += self.logsignalrate_shared(s) # Combine the signal and noise rates loglr = ln_s - ln_noise_rate # Apply statistic correction loglr += self.stat_correction # cut off underflowing and very small values loglr[loglr < self.min_stat] = self.min_stat return loglr
[docs] def coinc_lim_for_thresh( self, s, thresh, limifo, **kwargs ): # pylint:disable=unused-argument """ Optimization function to identify coincs too quiet to be of interest We are trying to get rid of as many events here at the point where we can be confident that they will not meet ranking statistic thresholds. The calculation here is "What is the minimum required snglstat in the pivot IFO which could possibly pass the threshold?" There are a couple of points to be wary of here, e.g. in the signal rate calculation, we take the best-case scenario. By using the best-case for signal rate in this calculation, some events are kept at this point which are hopeless. Parameters ---------- s: list List of (ifo, single detector statistic) tuples thresh: float The threshold value to be checked against limifo: string The pivot detector, i.e. the one in which the sngl stat must reach the value output by ths function kwargs: various Key-word arguments to define different features and tunable values in the statistic. Only time_addition is used here Returns ------- numpy.array The minimum snglstat values required in the 'pivot' detector in order to reach the threshold specified """ # Safety against subclassing and not rethinking this allowed_names = ["ExpFitStatistic"] self._check_coinc_lim_subclass(allowed_names) if thresh <= self.min_stat: return numpy.ones(len(s[0][1]["snglstat"])) * numpy.inf # Modify the sngls so that the pivot ifo snglstats are zero sngl_dict = {sngl[0]: sngl[1]["snglstat"] for sngl in s} sngl_dict[limifo] = numpy.zeros(len(s[0][1])) # Noise rate calculated as normal. Because of the modification # above, this is the rank_stat_coinc noise rate calculation # minus the pivot ifo's snglstat ln_noise_rate = coinc_rate.combination_noise_lograte( sngl_dict, kwargs["time_addition"] ) ln_noise_rate -= self.benchmark_lograte # Basic option is not to have any signal-based assumptions, # so this is zero to begin with ln_s = 0 if self.kwargs["phasetd"]: if not self.has_hist: self.get_hist() # Assume best-case scenario and use maximum signal rate ln_s = numpy.log( self.hist_max * (self.min_snr / self.ref_snr) ** -4. ) # Shared info is the same as in the coinc calculation ln_s += self.logsignalrate_shared(s) # Combine the signal and noise rates loglr = ln_s - ln_noise_rate # Apply statistic correction loglr += self.stat_correction # From this combined rate, what is the minimum snglstat value # in the pivot IFO needed to reach the threshold? return loglr - thresh
[docs] class ExpFitCombinedSNR(ExpFitStatistic): """ Reworking of ExpFitStatistic designed to resemble network SNR Use a monotonic function of the negative log noise rate density which approximates combined (new)snr for coincs with similar newsnr in each ifo """ def __init__(self, sngl_ranking, files=None, ifos=None, **kwargs): """ Parameters ---------- sngl_ranking: str The name of the ranking to use for the single-detector triggers. files: list of strs, needed here A list containing the filenames of hdf format files used to help construct the coincident statistics. The files must have a 'stat' attribute which is used to associate them with the appropriate statistic class. ifos: list of strs, not used here The list of detector names """ ExpFitStatistic.__init__( self, sngl_ranking, files=files, ifos=ifos, **kwargs ) # for low-mass templates the exponential slope alpha \approx 6 self.alpharef = 6. self.single_increasing = True self.single_dtype = numpy.float32 # Modifier to get a sensible value of the fit slope below threshold self.alphabelow = float( self.kwargs.get("alpha_below_thresh", numpy.inf) )
[docs] def single(self, trigs): """ Calculate the necessary single detector information Parameters ---------- trigs: dict of numpy.ndarrays, h5py group or similar dict-like object Object holding single detector trigger information. Returns ------- numpy.ndarray The array of single detector values """ logr_n = self.lognoiserate(trigs) _, _, thresh = self.find_fits(trigs) # shift by log of reference slope alpha logr_n += -1. * numpy.log(self.alpharef) # add threshold and rescale by reference slope stat = thresh - (logr_n / self.alpharef) return numpy.array(stat, ndmin=1, dtype=numpy.float32)
[docs] def rank_stat_single(self, single_info): """ Calculate the statistic for single detector candidates Parameters ---------- single_info: tuple Tuple containing two values. The first is the ifo (str) and the second is the single detector triggers. Returns ------- numpy.ndarray The array of single detector statistics """ if self.single_increasing: sngl_multiifo = single_info[1] else: sngl_multiifo = -1. * single_info[1] return sngl_multiifo
[docs] def rank_stat_coinc( self, s, slide, step, to_shift, **kwargs ): # pylint:disable=unused-argument """ Calculate the coincident detection statistic. Parameters ---------- sngls_list: list List of (ifo, single detector statistic) tuples slide: (unused in this statistic) step: (unused in this statistic) to_shift: list List of integers indicating what multiples of the time shift will be applied (unused in this statistic) Returns ------- numpy.ndarray Array of coincident ranking statistic values """ # scale by 1/sqrt(number of ifos) to resemble network SNR return sum(sngl[1] for sngl in s) / (len(s) ** 0.5)
[docs] def coinc_lim_for_thresh( self, s, thresh, limifo, **kwargs ): # pylint:disable=unused-argument """ Optimization function to identify coincs too quiet to be of interest Calculate the required single detector statistic to exceed the threshold for each of the input triggers. Parameters ---------- s: list List of (ifo, single detector statistic) tuples for all detectors except limifo. thresh: float The threshold on the coincident statistic. limifo: string The ifo for which the limit is to be found. Returns ------- numpy.ndarray Array of limits on the limifo single statistic to exceed thresh. """ # Safety against subclassing and not rethinking this allowed_names = ["ExpFitCombinedSNR"] self._check_coinc_lim_subclass(allowed_names) return thresh * ((len(s) + 1) ** 0.5) - sum(sngl[1] for sngl in s)
statistic_dict = { "quadsum": QuadratureSumStatistic, "single_ranking_only": QuadratureSumStatistic, "phasetd": PhaseTDStatistic, "exp_fit_csnr": ExpFitCombinedSNR, "exp_fit": ExpFitStatistic, }
[docs] def get_statistic(stat): """ Error-handling sugar around dict lookup for coincident statistics Parameters ---------- stat : string Name of the coincident statistic Returns ------- class Subclass of Stat base class Raises ------ RuntimeError If the string is not recognized as corresponding to a Stat subclass """ try: return statistic_dict[stat] except KeyError: raise RuntimeError("%s is not an available detection statistic" % stat)
[docs] def insert_statistic_option_group(parser, default_ranking_statistic=None): """ Add ranking statistic options to the optparser object. Adds the options used to initialize a PyCBC Stat class. Parameters ----------- parser : object OptionParser instance. default_ranking_statisic : str Allows setting a default statistic for the '--ranking-statistic' option. The option is no longer required if a default is provided. Returns -------- strain_opt_group : optparser.argument_group The argument group that is added to the parser. """ statistic_opt_group = parser.add_argument_group( "Options needed to initialize a PyCBC Stat class for computing the " "ranking of events from a PyCBC search." ) statistic_opt_group.add_argument( "--ranking-statistic", default=default_ranking_statistic, choices=statistic_dict.keys(), required=True if default_ranking_statistic is None else False, help="The coinc ranking statistic to calculate", ) statistic_opt_group.add_argument( "--sngl-ranking", choices=ranking.sngls_ranking_function_dict.keys(), required=True, help="The single-detector trigger ranking to use.", ) statistic_opt_group.add_argument( "--statistic-files", nargs="*", action="append", default=[], help="Files containing ranking statistic info", ) statistic_opt_group.add_argument( "--statistic-keywords", nargs="*", default=[], help="Provide additional key-word arguments to be sent to " "the statistic class when it is initialized. Should " "be given in format --statistic-keywords " "KWARG1:VALUE1 KWARG2:VALUE2 KWARG3:VALUE3 ...", ) statistic_opt_group.add_argument( "--statistic-features", nargs="*", default=[], choices=_allowed_statistic_features, help="Provide additional arguments to include particular " "features in the ranking statistic.", ) return statistic_opt_group
[docs] def parse_statistic_feature_options(stat_features, stat_kwarg_list): """ Parse the list of statistic keywords into an appropriate dictionary. Take input from the input argument ["KWARG1:VALUE1", "KWARG2:VALUE2", "KWARG3:VALUE3"] and convert into a dictionary. Parameters ---------- stat_kwarg_list : list Statistic keywords in list format Returns ------- stat_kwarg_dict : dict Statistic keywords in dict format """ stat_kwarg_dict = {} # Check that the statistic keywords are allowed for feature in stat_features: if feature not in _allowed_statistic_features: err_msg = f"--statistic-feature {feature} not recognised" raise NotImplementedError(err_msg) # Set values for each feature key to a boolean of whether we want them for feature in _allowed_statistic_features: stat_kwarg_dict[feature] = feature in stat_features for inputstr in stat_kwarg_list: try: key, value = inputstr.split(":") stat_kwarg_dict[key] = value except ValueError: err_txt = ( "--statistic-keywords must take input in the " "form KWARG1:VALUE1 KWARG2:VALUE2 KWARG3:VALUE3 ... " "Received {}".format(" ".join(stat_kwarg_list)) ) raise ValueError(err_txt) return stat_kwarg_dict
[docs] def get_statistic_from_opts(opts, ifos): """ Return a Stat class from an optparser object. This will assume that the options in the statistic_opt_group are present and will use these options to call stat.get_statistic and initialize the appropriate Stat subclass with appropriate kwargs. Parameters ---------- opts : optparse.OptParser instance The command line options ifos : list The list of detector names Returns ------- class Subclass of Stat base class """ # Allow None inputs if opts.statistic_files is None: opts.statistic_files = [] if opts.statistic_keywords is None: opts.statistic_keywords = [] # flatten the list of lists of filenames to a single list (may be empty) # if needed (e.g. not calling get_statistic_from_opts in a loop) if len(opts.statistic_files) > 0 and \ isinstance(opts.statistic_files[0], list): opts.statistic_files = sum(opts.statistic_files, []) extra_kwargs = parse_statistic_feature_options( opts.statistic_features, opts.statistic_keywords, ) stat_class = get_statistic(opts.ranking_statistic)( opts.sngl_ranking, opts.statistic_files, ifos=ifos, **extra_kwargs ) return stat_class