""" utilities for assigning FAR to single detector triggers
"""
import logging
import copy
import threading
import time
import numpy as np
from pycbc.events import trigger_fits as fits, stat
from pycbc.types import MultiDetOptionAction
from pycbc import conversions as conv
from pycbc.io.hdf import HFile
from pycbc import bin_utils
logger = logging.getLogger('pycbc.events.single')
[docs]
class LiveSingle(object):
def __init__(self, ifo,
ranking_threshold=10.0,
reduced_chisq_threshold=5,
duration_threshold=0,
fit_file=None,
sngl_ifar_est_dist=None,
fixed_ifar=None,
maximum_ifar=None,
statistic=None,
sngl_ranking=None,
stat_files=None,
statistic_refresh_rate=None,
**kwargs):
"""
Parameters
----------
ifo: str
Name of the ifo that is being analyzed
newsnr_threshold: float
Minimum value for the reweighted SNR of the event under
consideration. Which reweighted SNR is defined by sngl_ranking
reduced_chisq_threshold: float
Maximum value for the reduced chisquared of the event under
consideration
duration_threshold: float
Minimum value for the duration of the template which found the
event under consideration
fit_file: str or path
(optional) the file containing information about the
single-detector event significance distribution fits
sngl_ifar_est_dist: str
Which trigger distribution to use when calculating IFAR of
single-detector events
fixed_ifar: float
(optional) give a fixed IFAR value to any event which passes the
threshold criteria
statistic: str
The name of the statistic to rank events.
sngl_ranking: str
The single detector ranking to use with the background statistic
stat_files: list of strs
List of filenames that contain information used to construct
various coincident statistics.
maximum_ifar: float
The largest inverse false alarm rate in years that we would like to
calculate.
statistic_refresh_rate: float
How regularly to run the update_files method on the statistic
class (in seconds), default not do do this
kwargs: dict
Additional options for the statistic to use. See stat.py
for more details on statistic options.
"""
self.ifo = ifo
self.fit_file = fit_file
self.sngl_ifar_est_dist = sngl_ifar_est_dist
self.fixed_ifar = fixed_ifar
self.maximum_ifar = maximum_ifar
self.time_stat_refreshed = time.time()
self.stat_calculator_lock = threading.Lock()
self.statistic_refresh_rate = statistic_refresh_rate
stat_class = stat.get_statistic(statistic)
self.stat_calculator = stat_class(
sngl_ranking,
stat_files,
ifos=[ifo],
**kwargs
)
self.thresholds = {
"ranking": ranking_threshold,
"reduced_chisq": reduced_chisq_threshold,
"duration": duration_threshold}
[docs]
@staticmethod
def insert_args(parser):
parser.add_argument('--single-ranking-threshold', nargs='+',
type=float, action=MultiDetOptionAction,
help='Single ranking threshold for '
'single-detector events. Can be given '
'as a single value or as detector-value '
'pairs, e.g. H1:6 L1:7 V1:6.5')
parser.add_argument('--single-reduced-chisq-threshold', nargs='+',
type=float, action=MultiDetOptionAction,
help='Maximum reduced chi-squared threshold for '
'single triggers. Calcuated after any PSD '
'variation reweighting is applied. Can be '
'given as a single value or as '
'detector-value pairs, e.g. H1:2 L1:2 V1:3')
parser.add_argument('--single-duration-threshold', nargs='+',
type=float, action=MultiDetOptionAction,
help='Minimum duration threshold for single '
'triggers. Can be given as a single value '
'or as detector-value pairs, e.g. H1:6 L1:6 '
'V1:8')
parser.add_argument('--single-fixed-ifar', nargs='+',
type=float, action=MultiDetOptionAction,
help='A fixed value for IFAR, still uses cuts '
'defined by command line. Can be given as '
'a single value or as detector-value pairs, '
'e.g. H1:0.001 L1:0.001 V1:0.0005')
parser.add_argument('--single-maximum-ifar', nargs='+',
type=float, action=MultiDetOptionAction,
help='A maximum possible value for IFAR for '
'single-detector events. Can be given as '
'a single value or as detector-value pairs, '
'e.g. H1:100 L1:1000 V1:50')
parser.add_argument('--single-fit-file',
help='File which contains definitons of fit '
'coefficients and counts for specific '
'single trigger IFAR fitting.')
parser.add_argument('--sngl-ifar-est-dist', nargs='+',
action=MultiDetOptionAction,
help='Which trigger distribution to use when '
'calculating IFAR of single triggers. '
'Can be given as a single value or as '
'detector-value pairs, e.g. H1:mean '
'L1:mean V1:conservative')
[docs]
@staticmethod
def verify_args(args, parser, ifos):
sngl_opts = [args.single_reduced_chisq_threshold,
args.single_duration_threshold,
args.single_ranking_threshold,
args.sngl_ifar_est_dist]
sngl_opts_str = ("--single-reduced-chisq-threshold, "
"--single-duration-threshold, "
"--single-ranking-threshold, "
"--sngl-ifar-est-dist")
if any(sngl_opts) and not all(sngl_opts):
parser.error(f"Single detector trigger options ({sngl_opts_str}) "
"must either all be given or none.")
if args.enable_single_detector_upload \
and not args.enable_gracedb_upload:
parser.error("--enable-single-detector-upload requires "
"--enable-gracedb-upload to be set.")
sngl_optional_opts = [args.single_fixed_ifar,
args.single_fit_file,
args.single_maximum_ifar]
sngl_optional_opts_str = ("--single-fixed-ifar, "
"--single-fit-file,"
"--single-maximum-ifar")
if any(sngl_optional_opts) and not all(sngl_opts):
parser.error("Optional singles options "
f"({sngl_optional_opts_str}) given but not all "
f"required options ({sngl_opts_str}) are.")
for ifo in ifos:
# Check which option(s) are needed for each IFO and if they exist:
# Notes for the logic here:
# args.sngl_ifar_est_dist.default_set is True if single value has
# been set to be the same for all values
# bool(args.sngl_ifar_est_dist) is True if option is given
if args.sngl_ifar_est_dist and \
not args.sngl_ifar_est_dist.default_set \
and not args.sngl_ifar_est_dist[ifo]:
# Option has been given, different for each IFO,
# and this one is not present
parser.error("All IFOs required in --single-ifar-est-dist "
"if IFO-specific options are given.")
if args.sngl_ifar_est_dist[ifo] is None:
# Default - no singles being used
continue
if not args.sngl_ifar_est_dist[ifo] == 'fixed':
if not args.single_fit_file:
# Fixed IFAR option doesnt need the fits file
parser.error(f"Single detector trigger fits file must be "
"given if --single-ifar-est-dist is not "
f"fixed for all ifos (at least {ifo} has "
f"option {args.sngl_ifar_est_dist[ifo]}).")
if ifo in args.single_fixed_ifar:
parser.error(f"Value {args.single_fixed_ifar[ifo]} given "
f"for {ifo} in --single-fixed-ifar, but "
f"--single-ifar-est-dist for {ifo} "
f"is {args.sngl_ifar_est_dist[ifo]}, not "
"fixed.")
else:
# Check that the fixed IFAR value has actually been
# given if using this instead of a distribution
if not args.single_fixed_ifar[ifo]:
parser.error(f"--single-fixed-ifar must be "
"given if --single-ifar-est-dist is fixed. "
f"This is true for at least {ifo}.")
# Return value is a boolean whether we are analysing singles or not
# The checks already performed mean that all(sngl_opts) is okay
return all(sngl_opts)
[docs]
@classmethod
def from_cli(cls, args, ifo):
# Allow None inputs
stat_files = args.statistic_files or []
stat_keywords = args.statistic_keywords or []
# flatten the list of lists of filenames to a single list
# (may be empty)
stat_files = sum(stat_files, [])
kwargs = stat.parse_statistic_keywords_opt(stat_keywords)
return cls(
ifo, ranking_threshold=args.single_ranking_threshold[ifo],
reduced_chisq_threshold=args.single_reduced_chisq_threshold[ifo],
duration_threshold=args.single_duration_threshold[ifo],
fixed_ifar=args.single_fixed_ifar,
maximum_ifar=args.single_maximum_ifar[ifo],
fit_file=args.single_fit_file,
sngl_ifar_est_dist=args.sngl_ifar_est_dist[ifo],
statistic=args.ranking_statistic,
sngl_ranking=args.sngl_ranking,
stat_files=stat_files,
statistic_refresh_rate=args.statistic_refresh_rate,
**kwargs
)
[docs]
def check(self, trigs, data_reader):
""" Look for a single detector trigger that passes the thresholds in
the current data.
"""
# Apply cuts to trigs before clustering
# Cut on snr so that triggers which could not reach the ranking
# threshold do not have ranking calculated
if 'psd_var_val' in trigs:
# We should apply the PSD variation rescaling, as this can
# re-weight the SNR to be above SNR
trig_chisq = trigs['chisq'] / trigs['psd_var_val']
trig_snr = trigs['snr'] / (trigs['psd_var_val'] ** 0.5)
else:
trig_chisq = trigs['chisq']
trig_snr = trigs['snr']
valid_idx = (trigs['template_duration'] >
self.thresholds['duration']) & \
(trig_chisq <
self.thresholds['reduced_chisq']) & \
(trig_snr >
self.thresholds['ranking'])
if not np.any(valid_idx):
return None
cut_trigs = {k: trigs[k][valid_idx] for k in trigs}
# Convert back from the pycbc live convention of chisq always
# meaning the reduced chisq.
trigsc = copy.copy(cut_trigs)
trigsc['ifo'] = self.ifo
trigsc['chisq'] = cut_trigs['chisq'] * cut_trigs['chisq_dof']
trigsc['chisq_dof'] = (cut_trigs['chisq_dof'] + 2) / 2
# Calculate the ranking reweighted SNR for cutting
with self.stat_calculator_lock:
single_rank = self.stat_calculator.get_sngl_ranking(trigsc)
sngl_idx = single_rank > self.thresholds['ranking']
if not np.any(sngl_idx):
return None
cutall_trigs = {k: trigsc[k][sngl_idx]
for k in trigs}
# Calculate the ranking statistic
with self.stat_calculator_lock:
sngl_stat = self.stat_calculator.single(cutall_trigs)
rank = self.stat_calculator.rank_stat_single((self.ifo, sngl_stat))
# 'cluster' by taking the maximal statistic value over the trigger set
i = rank.argmax()
# calculate the (inverse) false-alarm rate
ifar = self.calculate_ifar(
rank[i],
trigsc['template_duration'][i]
)
if ifar is None:
return None
# fill in a new candidate event
candidate = {
f'foreground/{self.ifo}/{k}': cutall_trigs[k][i] for k in trigs
}
candidate['foreground/stat'] = rank[i]
candidate['foreground/ifar'] = ifar
candidate['HWINJ'] = data_reader.near_hwinj()
return candidate
[docs]
def calculate_ifar(self, sngl_ranking, duration):
logger.info("Calculating IFAR")
if self.fixed_ifar and self.ifo in self.fixed_ifar:
return self.fixed_ifar[self.ifo]
try:
with HFile(self.fit_file, 'r') as fit_file:
bin_edges = fit_file['bins_edges'][:]
live_time = fit_file[self.ifo].attrs['live_time']
thresh = fit_file.attrs['fit_threshold']
dist_grp = fit_file[self.ifo][self.sngl_ifar_est_dist]
rates = dist_grp['counts'][:] / live_time
coeffs = dist_grp['fit_coeff'][:]
except FileNotFoundError:
logger.error(
'Single fit file %s not found; '
'dropping a potential single-detector candidate!',
self.fit_file
)
return None
bins = bin_utils.IrregularBins(bin_edges)
dur_bin = bins[duration]
rate = rates[dur_bin]
coeff = coeffs[dur_bin]
if np.isnan(coeff) or np.isnan(rate):
logger.warning(
"Single trigger fits are not valid - singles "
"cannot be assessed for this detector at this time."
)
return None
rate_louder = rate * fits.cum_fit(
'exponential',
[sngl_ranking],
coeff,
thresh
)[0]
# apply a trials factor of the number of duration bins
rate_louder *= len(rates)
return min(conv.sec_to_year(1. / rate_louder), self.maximum_ifar)
[docs]
def start_refresh_thread(self):
"""
Start a thread managing whether the stat_calculator will be updated
"""
if self.statistic_refresh_rate is None:
logger.info("Statistic refresh disabled for %s", self.ifo)
return
thread = threading.Thread(
target=self.refresh_statistic,
daemon=True,
name="Stat refresh " + self.ifo
)
logger.info("Starting %s statistic refresh thread", self.ifo)
thread.start()
[docs]
def refresh_statistic(self):
"""
Function to refresh the stat_calculator at regular intervals
"""
while True:
# How long since the statistic was last updated?
since_stat_refresh = time.time() - self.time_stat_refreshed
if since_stat_refresh > self.statistic_refresh_rate:
self.time_stat_refreshed = time.time()
logger.info(
"Checking %s statistic for updated files", self.ifo
)
with self.stat_calculator_lock:
self.stat_calculator.check_update_files()
# Sleep one second for safety
time.sleep(1)
# Now use the time it took the check / update the statistic
since_stat_refresh = time.time() - self.time_stat_refreshed
logger.debug(
"%s statistic: Waiting %.3fs for next refresh",
self.ifo,
self.statistic_refresh_rate - since_stat_refresh
)
time.sleep(self.statistic_refresh_rate - since_stat_refresh)