Source code for pycbc.events.single

""" utilities for assigning FAR to single detector triggers
"""
import logging
import copy
import threading
from datetime import datetime as dt
import time
import numpy as np

from pycbc.events import trigger_fits as fits, stat
from pycbc.types import MultiDetOptionAction
from pycbc import conversions as conv
from pycbc.io.hdf import HFile
from pycbc import bin_utils

logger = logging.getLogger('pycbc.events.single')


[docs]class LiveSingle(object): def __init__(self, ifo, ranking_threshold=10.0, reduced_chisq_threshold=5, duration_threshold=0, fit_file=None, sngl_ifar_est_dist=None, fixed_ifar=None, maximum_ifar=None, statistic=None, sngl_ranking=None, stat_files=None, statistic_refresh_rate=None, **kwargs): """ Parameters ---------- ifo: str Name of the ifo that is being analyzed newsnr_threshold: float Minimum value for the reweighted SNR of the event under consideration. Which reweighted SNR is defined by sngl_ranking reduced_chisq_threshold: float Maximum value for the reduced chisquared of the event under consideration duration_threshold: float Minimum value for the duration of the template which found the event under consideration fit_file: str or path (optional) the file containing information about the single-detector event significance distribution fits sngl_ifar_est_dist: str Which trigger distribution to use when calculating IFAR of single-detector events fixed_ifar: float (optional) give a fixed IFAR value to any event which passes the threshold criteria statistic: str The name of the statistic to rank events. sngl_ranking: str The single detector ranking to use with the background statistic stat_files: list of strs List of filenames that contain information used to construct various coincident statistics. maximum_ifar: float The largest inverse false alarm rate in years that we would like to calculate. statistic_refresh_rate: float How regularly to run the update_files method on the statistic class (in seconds), default not do do this kwargs: dict Additional options for the statistic to use. See stat.py for more details on statistic options. """ self.ifo = ifo self.fit_file = fit_file self.sngl_ifar_est_dist = sngl_ifar_est_dist self.fixed_ifar = fixed_ifar self.maximum_ifar = maximum_ifar self.time_stat_refreshed = dt.now() self.stat_calculator_lock = threading.Lock() self.statistic_refresh_rate = statistic_refresh_rate stat_class = stat.get_statistic(statistic) self.stat_calculator = stat_class( sngl_ranking, stat_files, ifos=[ifo], **kwargs ) self.thresholds = { "ranking": ranking_threshold, "reduced_chisq": reduced_chisq_threshold, "duration": duration_threshold}
[docs] @staticmethod def insert_args(parser): parser.add_argument('--single-ranking-threshold', nargs='+', type=float, action=MultiDetOptionAction, help='Single ranking threshold for ' 'single-detector events. Can be given ' 'as a single value or as detector-value ' 'pairs, e.g. H1:6 L1:7 V1:6.5') parser.add_argument('--single-reduced-chisq-threshold', nargs='+', type=float, action=MultiDetOptionAction, help='Maximum reduced chi-squared threshold for ' 'single triggers. Calcuated after any PSD ' 'variation reweighting is applied. Can be ' 'given as a single value or as ' 'detector-value pairs, e.g. H1:2 L1:2 V1:3') parser.add_argument('--single-duration-threshold', nargs='+', type=float, action=MultiDetOptionAction, help='Minimum duration threshold for single ' 'triggers. Can be given as a single value ' 'or as detector-value pairs, e.g. H1:6 L1:6 ' 'V1:8') parser.add_argument('--single-fixed-ifar', nargs='+', type=float, action=MultiDetOptionAction, help='A fixed value for IFAR, still uses cuts ' 'defined by command line. Can be given as ' 'a single value or as detector-value pairs, ' 'e.g. H1:0.001 L1:0.001 V1:0.0005') parser.add_argument('--single-maximum-ifar', nargs='+', type=float, action=MultiDetOptionAction, help='A maximum possible value for IFAR for ' 'single-detector events. Can be given as ' 'a single value or as detector-value pairs, ' 'e.g. H1:100 L1:1000 V1:50') parser.add_argument('--single-fit-file', help='File which contains definitons of fit ' 'coefficients and counts for specific ' 'single trigger IFAR fitting.') parser.add_argument('--sngl-ifar-est-dist', nargs='+', action=MultiDetOptionAction, help='Which trigger distribution to use when ' 'calculating IFAR of single triggers. ' 'Can be given as a single value or as ' 'detector-value pairs, e.g. H1:mean ' 'L1:mean V1:conservative')
[docs] @staticmethod def verify_args(args, parser, ifos): sngl_opts = [args.single_reduced_chisq_threshold, args.single_duration_threshold, args.single_ranking_threshold, args.sngl_ifar_est_dist] sngl_opts_str = ("--single-reduced-chisq-threshold, " "--single-duration-threshold, " "--single-ranking-threshold, " "--sngl-ifar-est-dist") if any(sngl_opts) and not all(sngl_opts): parser.error(f"Single detector trigger options ({sngl_opts_str}) " "must either all be given or none.") if args.enable_single_detector_upload \ and not args.enable_gracedb_upload: parser.error("--enable-single-detector-upload requires " "--enable-gracedb-upload to be set.") sngl_optional_opts = [args.single_fixed_ifar, args.single_fit_file, args.single_maximum_ifar] sngl_optional_opts_str = ("--single-fixed-ifar, " "--single-fit-file," "--single-maximum-ifar") if any(sngl_optional_opts) and not all(sngl_opts): parser.error("Optional singles options " f"({sngl_optional_opts_str}) given but not all " f"required options ({sngl_opts_str}) are.") for ifo in ifos: # Check which option(s) are needed for each IFO and if they exist: # Notes for the logic here: # args.sngl_ifar_est_dist.default_set is True if single value has # been set to be the same for all values # bool(args.sngl_ifar_est_dist) is True if option is given if args.sngl_ifar_est_dist and \ not args.sngl_ifar_est_dist.default_set \ and not args.sngl_ifar_est_dist[ifo]: # Option has been given, different for each IFO, # and this one is not present parser.error("All IFOs required in --single-ifar-est-dist " "if IFO-specific options are given.") if args.sngl_ifar_est_dist[ifo] is None: # Default - no singles being used continue if not args.sngl_ifar_est_dist[ifo] == 'fixed': if not args.single_fit_file: # Fixed IFAR option doesnt need the fits file parser.error(f"Single detector trigger fits file must be " "given if --single-ifar-est-dist is not " f"fixed for all ifos (at least {ifo} has " f"option {args.sngl_ifar_est_dist[ifo]}).") if ifo in args.single_fixed_ifar: parser.error(f"Value {args.single_fixed_ifar[ifo]} given " f"for {ifo} in --single-fixed-ifar, but " f"--single-ifar-est-dist for {ifo} " f"is {args.sngl_ifar_est_dist[ifo]}, not " "fixed.") else: # Check that the fixed IFAR value has actually been # given if using this instead of a distribution if not args.single_fixed_ifar[ifo]: parser.error(f"--single-fixed-ifar must be " "given if --single-ifar-est-dist is fixed. " f"This is true for at least {ifo}.") # Return value is a boolean whether we are analysing singles or not # The checks already performed mean that all(sngl_opts) is okay return all(sngl_opts)
[docs] @classmethod def from_cli(cls, args, ifo): # Allow None inputs stat_files = args.statistic_files or [] stat_keywords = args.statistic_keywords or [] # flatten the list of lists of filenames to a single list # (may be empty) stat_files = sum(stat_files, []) kwargs = stat.parse_statistic_keywords_opt(stat_keywords) return cls( ifo, ranking_threshold=args.single_ranking_threshold[ifo], reduced_chisq_threshold=args.single_reduced_chisq_threshold[ifo], duration_threshold=args.single_duration_threshold[ifo], fixed_ifar=args.single_fixed_ifar, maximum_ifar=args.single_maximum_ifar[ifo], fit_file=args.single_fit_file, sngl_ifar_est_dist=args.sngl_ifar_est_dist[ifo], statistic=args.ranking_statistic, sngl_ranking=args.sngl_ranking, stat_files=stat_files, statistic_refresh_rate=args.statistic_refresh_rate, **kwargs )
[docs] def check(self, trigs, data_reader): """ Look for a single detector trigger that passes the thresholds in the current data. """ # Apply cuts to trigs before clustering # Cut on snr so that triggers which could not reach the ranking # threshold do not have ranking calculated if 'psd_var_val' in trigs: # We should apply the PSD variation rescaling, as this can # re-weight the SNR to be above SNR trig_chisq = trigs['chisq'] / trigs['psd_var_val'] trig_snr = trigs['snr'] / (trigs['psd_var_val'] ** 0.5) else: trig_chisq = trigs['chisq'] trig_snr = trigs['snr'] valid_idx = (trigs['template_duration'] > self.thresholds['duration']) & \ (trig_chisq < self.thresholds['reduced_chisq']) & \ (trig_snr > self.thresholds['ranking']) if not np.any(valid_idx): return None cut_trigs = {k: trigs[k][valid_idx] for k in trigs} # Convert back from the pycbc live convention of chisq always # meaning the reduced chisq. trigsc = copy.copy(cut_trigs) trigsc['ifo'] = self.ifo trigsc['chisq'] = cut_trigs['chisq'] * cut_trigs['chisq_dof'] trigsc['chisq_dof'] = (cut_trigs['chisq_dof'] + 2) / 2 # Calculate the ranking reweighted SNR for cutting with self.stat_calculator_lock: single_rank = self.stat_calculator.get_sngl_ranking(trigsc) sngl_idx = single_rank > self.thresholds['ranking'] if not np.any(sngl_idx): return None cutall_trigs = {k: trigsc[k][sngl_idx] for k in trigs} # Calculate the ranking statistic with self.stat_calculator_lock: sngl_stat = self.stat_calculator.single(cutall_trigs) rank = self.stat_calculator.rank_stat_single((self.ifo, sngl_stat)) # 'cluster' by taking the maximal statistic value over the trigger set i = rank.argmax() # calculate the (inverse) false-alarm rate ifar = self.calculate_ifar( rank[i], trigsc['template_duration'][i] ) if ifar is None: return None # fill in a new candidate event candidate = { f'foreground/{self.ifo}/{k}': cutall_trigs[k][i] for k in trigs } candidate['foreground/stat'] = rank[i] candidate['foreground/ifar'] = ifar candidate['HWINJ'] = data_reader.near_hwinj() return candidate
[docs] def calculate_ifar(self, sngl_ranking, duration): logger.info("Calculating IFAR") if self.fixed_ifar and self.ifo in self.fixed_ifar: return self.fixed_ifar[self.ifo] try: with HFile(self.fit_file, 'r') as fit_file: bin_edges = fit_file['bins_edges'][:] live_time = fit_file[self.ifo].attrs['live_time'] thresh = fit_file.attrs['fit_threshold'] dist_grp = fit_file[self.ifo][self.sngl_ifar_est_dist] rates = dist_grp['counts'][:] / live_time coeffs = dist_grp['fit_coeff'][:] except FileNotFoundError: logger.error( 'Single fit file %s not found; ' 'dropping a potential single-detector candidate!', self.fit_file ) return None bins = bin_utils.IrregularBins(bin_edges) dur_bin = bins[duration] rate = rates[dur_bin] coeff = coeffs[dur_bin] if np.isnan(coeff) or np.isnan(rate): logger.warning( "Single trigger fits are not valid - singles " "cannot be assessed for this detector at this time." ) return None rate_louder = rate * fits.cum_fit( 'exponential', [sngl_ranking], coeff, thresh )[0] # apply a trials factor of the number of duration bins rate_louder *= len(rates) return min(conv.sec_to_year(1. / rate_louder), self.maximum_ifar)
[docs] def start_refresh_thread(self): """ Start a thread managing whether the stat_calculator will be updated """ thread = threading.Thread( target=self.refresh_statistic, daemon=True ) logger.info("Starting %s statistic refresh thread", self.ifo) thread.start()
[docs] def refresh_statistic(self): """ Function to refresh the stat_calculator at regular intervals """ while True: # How long since the statistic was last updated? since_stat_refresh = \ (dt.now() - self.time_stat_refreshed).total_seconds() if since_stat_refresh > self.statistic_refresh_rate: self.time_stat_refreshed = dt.now() logger.info( "Checking %s statistic for updated files", self.ifo, ) with self.stat_calculator_lock: self.stat_calculator.check_update_files() # Sleep one second for safety time.sleep(1) # Now use the time it took the check / update the statistic since_stat_refresh = \ (dt.now() - self.time_stat_refreshed).total_seconds() logger.debug( "%s statistic: Waiting %.3fs for next refresh", self.ifo, self.statistic_refresh_rate - since_stat_refresh ) time.sleep( self.statistic_refresh_rate - since_stat_refresh )