#Copyright (C) 2013 Alex Nitz
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Generals
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This modules contains functions reading, generating, and segmenting strain data
"""
import copy
import logging
import functools
import numpy
from scipy.signal import kaiserord
import pycbc.types
from pycbc.types import TimeSeries, zeros
from pycbc.types import Array, FrequencySeries
from pycbc.types import MultiDetOptionAppendAction, MultiDetOptionAction
from pycbc.types import MultiDetOptionActionSpecial
from pycbc.types import DictOptionAction, MultiDetDictOptionAction
from pycbc.types import required_opts, required_opts_multi_ifo
from pycbc.types import ensure_one_opt, ensure_one_opt_multi_ifo
from pycbc.types import copy_opts_for_single_ifo, complex_same_precision_as
from pycbc.inject import InjectionSet, SGBurstInjectionSet
from pycbc.filter import resample_to_delta_t, lowpass, highpass, make_frequency_series
from pycbc.filter.zpk import filter_zpk
from pycbc.waveform.spa_tmplt import spa_distance
import pycbc.psd
from pycbc.fft import FFT, IFFT
import pycbc.events
import pycbc.frame
import pycbc.filter
logger = logging.getLogger('pycbc.strain.strain')
[docs]
def next_power_of_2(n):
"""Return the smallest integer power of 2 larger than the argument.
Parameters
----------
n : int
A positive integer.
Returns
-------
m : int
Smallest integer power of 2 larger than n.
"""
return 1 << n.bit_length()
[docs]
def detect_loud_glitches(strain, psd_duration=4., psd_stride=2.,
psd_avg_method='median', low_freq_cutoff=30.,
threshold=50., cluster_window=5., corrupt_time=4.,
high_freq_cutoff=None, output_intermediates=False):
"""Automatic identification of loud transients for gating purposes.
This function first estimates the PSD of the input time series using the
FindChirp Welch method. Then it whitens the time series using that
estimate. Finally, it computes the magnitude of the whitened series,
thresholds it and applies the FindChirp clustering over time to the
surviving samples.
Parameters
----------
strain : TimeSeries
Input strain time series to detect glitches over.
psd_duration : {float, 4}
Duration of the segments for PSD estimation in seconds.
psd_stride : {float, 2}
Separation between PSD estimation segments in seconds.
psd_avg_method : {string, 'median'}
Method for averaging PSD estimation segments.
low_freq_cutoff : {float, 30}
Minimum frequency to include in the whitened strain.
threshold : {float, 50}
Minimum magnitude of whitened strain for considering a transient to
be present.
cluster_window : {float, 5}
Length of time window to cluster surviving samples over, in seconds.
corrupt_time : {float, 4}
Amount of time to be discarded at the beginning and end of the input
time series.
high_frequency_cutoff : {float, None}
Maximum frequency to include in the whitened strain. If given, the
input series is downsampled accordingly. If omitted, the Nyquist
frequency is used.
output_intermediates : {bool, False}
Save intermediate time series for debugging.
"""
if high_freq_cutoff:
strain = resample_to_delta_t(strain, 0.5 / high_freq_cutoff,
method='ldas')
else:
strain = strain.copy()
# taper strain
corrupt_length = int(corrupt_time * strain.sample_rate)
w = numpy.arange(corrupt_length) / float(corrupt_length)
strain[0:corrupt_length] *= pycbc.types.Array(w, dtype=strain.dtype)
strain[(len(strain) - corrupt_length):] *= \
pycbc.types.Array(w[::-1], dtype=strain.dtype)
if output_intermediates:
strain.save_to_wav('strain_conditioned.wav')
# zero-pad strain to a power-of-2 length
strain_pad_length = next_power_of_2(len(strain))
pad_start = int(strain_pad_length / 2 - len(strain) / 2)
pad_end = pad_start + len(strain)
pad_epoch = strain.start_time - pad_start / float(strain.sample_rate)
strain_pad = pycbc.types.TimeSeries(
pycbc.types.zeros(strain_pad_length, dtype=strain.dtype),
delta_t=strain.delta_t, copy=False, epoch=pad_epoch)
strain_pad[pad_start:pad_end] = strain[:]
# estimate the PSD
psd = pycbc.psd.welch(strain[corrupt_length:(len(strain)-corrupt_length)],
seg_len=int(psd_duration * strain.sample_rate),
seg_stride=int(psd_stride * strain.sample_rate),
avg_method=psd_avg_method,
require_exact_data_fit=False)
psd = pycbc.psd.interpolate(psd, 1. / strain_pad.duration)
psd = pycbc.psd.inverse_spectrum_truncation(
psd, int(psd_duration * strain.sample_rate),
low_frequency_cutoff=low_freq_cutoff,
trunc_method='hann')
kmin = int(low_freq_cutoff / psd.delta_f)
psd[0:kmin] = numpy.inf
if high_freq_cutoff:
kmax = int(high_freq_cutoff / psd.delta_f)
psd[kmax:] = numpy.inf
# whiten
strain_tilde = strain_pad.to_frequencyseries()
if high_freq_cutoff:
norm = high_freq_cutoff - low_freq_cutoff
else:
norm = strain.sample_rate / 2. - low_freq_cutoff
strain_tilde *= (psd * norm) ** (-0.5)
strain_pad = strain_tilde.to_timeseries()
if output_intermediates:
strain_pad[pad_start:pad_end].save_to_wav('strain_whitened.wav')
mag = abs(strain_pad[pad_start:pad_end])
if output_intermediates:
mag.save('strain_whitened_mag.npy')
mag = mag.numpy()
# remove strain corrupted by filters at the ends
mag[0:corrupt_length] = 0
mag[-1:-corrupt_length-1:-1] = 0
# find peaks and their times
indices = numpy.where(mag > threshold)[0]
cluster_idx = pycbc.events.findchirp_cluster_over_window(
indices, numpy.array(mag[indices]),
int(cluster_window*strain.sample_rate))
times = [idx * strain.delta_t + strain.start_time \
for idx in indices[cluster_idx]]
return times
[docs]
def from_cli(opt, dyn_range_fac=1, precision='single',
inj_filter_rejector=None):
"""Parses the CLI options related to strain data reading and conditioning.
Parameters
----------
opt : object
Result of parsing the CLI with OptionParser, or any object with the
required attributes (gps-start-time, gps-end-time, strain-high-pass,
pad-data, sample-rate, (frame-cache or frame-files), channel-name,
fake-strain, fake-strain-seed, fake-strain-from-file, gating_file).
dyn_range_fac : {float, 1}, optional
A large constant to reduce the dynamic range of the strain.
precision : string
Precision of the returned strain ('single' or 'double').
inj_filter_rejector : InjFilterRejector instance; optional, default=None
If given send the InjFilterRejector instance to the inject module so
that it can store a reduced representation of injections if
necessary.
Returns
-------
strain : TimeSeries
The time series containing the conditioned strain data.
"""
gating_info = {}
injector = InjectionSet.from_cli(opt)
if opt.frame_cache or opt.frame_files or opt.frame_type or opt.hdf_store:
if opt.frame_cache:
frame_source = opt.frame_cache
if opt.frame_files:
frame_source = opt.frame_files
logger.info("Reading Frames")
if hasattr(opt, 'frame_sieve') and opt.frame_sieve:
sieve = opt.frame_sieve
else:
sieve = None
if opt.frame_type:
strain = pycbc.frame.query_and_read_frame(
opt.frame_type, opt.channel_name,
start_time=opt.gps_start_time-opt.pad_data,
end_time=opt.gps_end_time+opt.pad_data,
sieve=sieve)
elif opt.frame_files or opt.frame_cache:
strain = pycbc.frame.read_frame(
frame_source, opt.channel_name,
start_time=opt.gps_start_time-opt.pad_data,
end_time=opt.gps_end_time+opt.pad_data,
sieve=sieve)
elif opt.hdf_store:
strain = pycbc.frame.read_store(opt.hdf_store, opt.channel_name,
opt.gps_start_time - opt.pad_data,
opt.gps_end_time + opt.pad_data)
elif opt.fake_strain or opt.fake_strain_from_file:
logger.info("Generating Fake Strain")
duration = opt.gps_end_time - opt.gps_start_time
duration += 2 * opt.pad_data
pdf = 1.0 / opt.fake_strain_filter_duration
fake_flow = opt.fake_strain_flow
fake_rate = opt.fake_strain_sample_rate
fake_extra_args = opt.fake_strain_extra_args
plen = round(opt.sample_rate / pdf) // 2 + 1
if opt.fake_strain_from_file:
logger.info("Reading ASD from file")
strain_psd = pycbc.psd.from_txt(opt.fake_strain_from_file,
plen, pdf,
fake_flow,
is_asd_file=True)
elif opt.fake_strain != 'zeroNoise':
logger.info("Making PSD for strain")
strain_psd = pycbc.psd.from_string(opt.fake_strain, plen, pdf,
fake_flow, **fake_extra_args)
if opt.fake_strain == 'zeroNoise':
logger.info("Making zero-noise time series")
strain = TimeSeries(pycbc.types.zeros(duration * fake_rate),
delta_t=1.0 / fake_rate,
epoch=opt.gps_start_time - opt.pad_data)
else:
logger.info("Making colored noise")
from pycbc.noise.reproduceable import colored_noise
strain = colored_noise(strain_psd,
opt.gps_start_time - opt.pad_data,
opt.gps_end_time + opt.pad_data,
seed=opt.fake_strain_seed,
sample_rate=fake_rate,
low_frequency_cutoff=fake_flow,
filter_duration=1.0/pdf)
if not strain.sample_rate_close(fake_rate):
err_msg = "Actual sample rate of generated data does not match "
err_msg += "that expected. Possible causes of this:\n"
err_msg += "The desired duration is not a multiple of delta_t. "
err_msg += "e.g. If using LISA with delta_t = 15 the duration "
err_msg += "must be a multiple of 15 seconds."
raise ValueError(err_msg)
if not opt.channel_name and (opt.injection_file \
or opt.sgburst_injection_file):
raise ValueError('Please provide channel names with the format '
'ifo:channel (e.g. H1:CALIB-STRAIN) to inject '
'simulated signals into fake strain')
if opt.zpk_z and opt.zpk_p and opt.zpk_k:
logger.info("Highpass Filtering")
strain = highpass(strain, frequency=opt.strain_high_pass)
logger.info("Applying zpk filter")
z = numpy.array(opt.zpk_z)
p = numpy.array(opt.zpk_p)
k = float(opt.zpk_k)
strain = filter_zpk(strain.astype(numpy.float64), z, p, k)
if opt.normalize_strain:
logger.info("Dividing strain by constant")
l = opt.normalize_strain
strain = strain / l
if opt.strain_high_pass:
logger.info("Highpass Filtering")
strain = highpass(strain, frequency=opt.strain_high_pass)
if opt.sample_rate:
logger.info("Resampling data")
strain = resample_to_delta_t(strain,
1. / opt.sample_rate,
method='ldas')
if injector is not None:
logger.info("Applying injections")
injections = \
injector.apply(strain, opt.channel_name.split(':')[0],
distance_scale=opt.injection_scale_factor,
injection_sample_rate=opt.injection_sample_rate,
inj_filter_rejector=inj_filter_rejector)
if opt.sgburst_injection_file:
logger.info("Applying sine-Gaussian burst injections")
injector = SGBurstInjectionSet(opt.sgburst_injection_file)
injector.apply(strain, opt.channel_name.split(':')[0],
distance_scale=opt.injection_scale_factor)
if precision == 'single':
logger.info("Converting to float32")
strain = (strain * dyn_range_fac).astype(pycbc.types.float32)
elif precision == "double":
logger.info("Converting to float64")
strain = (strain * dyn_range_fac).astype(pycbc.types.float64)
else:
raise ValueError("Unrecognized precision {}".format(precision))
if opt.gating_file is not None:
logger.info("Gating times contained in gating file")
gate_params = numpy.loadtxt(opt.gating_file)
if len(gate_params.shape) == 1:
gate_params = [gate_params]
for gate_time, gate_window, gate_taper in gate_params:
strain = strain.gate(gate_time, window=gate_window,
method=opt.gating_method,
copy=False,
taper_width=gate_taper)
gating_info['file'] = \
[gp for gp in gate_params \
if (gp[0] + gp[1] + gp[2] >= strain.start_time) \
and (gp[0] - gp[1] - gp[2] <= strain.end_time)]
if opt.autogating_threshold is not None:
gating_info['auto'] = []
for _ in range(opt.autogating_max_iterations):
glitch_times = detect_loud_glitches(
strain, threshold=opt.autogating_threshold,
cluster_window=opt.autogating_cluster,
low_freq_cutoff=opt.strain_high_pass,
corrupt_time=opt.pad_data + opt.autogating_pad)
gate_params = [[gt, opt.autogating_width, opt.autogating_taper]
for gt in glitch_times]
gating_info['auto'] += gate_params
for gate_time, gate_window, gate_taper in gate_params:
strain = strain.gate(gate_time, window=gate_window,
method=opt.gating_method,
copy=False,
taper_width=gate_taper)
if len(glitch_times) > 0:
logger.info('Autogating at %s',
', '.join(['%.3f' % gt
for gt in glitch_times]))
else:
break
if opt.strain_high_pass:
logger.info("Highpass Filtering")
strain = highpass(strain, frequency=opt.strain_high_pass)
if opt.strain_low_pass:
logger.info("Lowpass Filtering")
strain = lowpass(strain, frequency=opt.strain_low_pass)
if hasattr(opt, 'witness_frame_type') and opt.witness_frame_type:
stilde = strain.to_frequencyseries()
from pycbc.io.hdf import HFile
tf_file = HFile(opt.witness_tf_file)
for key in tf_file:
witness = pycbc.frame.query_and_read_frame(opt.witness_frame_type,
str(key),
start_time=strain.start_time,
end_time=strain.end_time)
witness = (witness * dyn_range_fac).astype(strain.dtype)
tf = pycbc.types.load_frequencyseries(opt.witness_tf_file,
group=key)
tf = tf.astype(stilde.dtype)
flen = int(opt.witness_filter_length * strain.sample_rate)
tf = pycbc.psd.interpolate(tf, stilde.delta_f)
tf_time = tf.to_timeseries()
window = Array(numpy.hanning(flen * 2), dtype=strain.dtype)
tf_time[0:flen] *= window[flen:]
tf_time[len(tf_time)-flen:] *= window[0:flen]
tf = tf_time.to_frequencyseries()
kmax = min(len(tf), len(stilde) - 1)
stilde[:kmax] -= tf[:kmax] * witness.to_frequencyseries()[:kmax]
strain = stilde.to_timeseries()
if opt.pad_data:
logger.info("Remove Padding")
start = int(opt.pad_data * strain.sample_rate)
end = int(len(strain) - strain.sample_rate * opt.pad_data)
strain = strain[start:end]
if opt.taper_data:
logger.info("Tapering data")
# Use auto-gating, a one-sided gate is a taper
pd_taper_window = opt.taper_data
gate_params = [(strain.start_time, 0., pd_taper_window)]
gate_params.append((strain.end_time, 0., pd_taper_window))
gate_data(strain, gate_params)
if injector is not None:
strain.injections = injections
strain.gating_info = gating_info
return strain
[docs]
def from_cli_single_ifo(opt, ifo, inj_filter_rejector=None, **kwargs):
"""
Get the strain for a single ifo when using the multi-detector CLI
"""
single_det_opt = copy_opts_for_single_ifo(opt, ifo)
return from_cli(single_det_opt,
inj_filter_rejector=inj_filter_rejector, **kwargs)
[docs]
def from_cli_multi_ifos(opt, ifos, inj_filter_rejector_dict=None, **kwargs):
"""
Get the strain for all ifos when using the multi-detector CLI
"""
strain = {}
if inj_filter_rejector_dict is None:
inj_filter_rejector_dict = {ifo: None for ifo in ifos}
for ifo in ifos:
strain[ifo] = from_cli_single_ifo(opt, ifo,
inj_filter_rejector_dict[ifo], **kwargs)
return strain
[docs]
def insert_strain_option_group(parser, gps_times=True):
""" Add strain-related options to the optparser object.
Adds the options used to call the pycbc.strain.from_cli function to an
optparser as an OptionGroup. This should be used if you
want to use these options in your code.
Parameters
-----------
parser : object
OptionParser instance.
gps_times : bool, optional
Include ``--gps-start-time`` and ``--gps-end-time`` options. Default
is True.
"""
data_reading_group = parser.add_argument_group("Options for obtaining h(t)",
"These options are used for generating h(t) either by "
"reading from a file or by generating it. This is only "
"needed if the PSD is to be estimated from the data, ie. "
" if the --psd-estimation option is given.")
# Required options
if gps_times:
data_reading_group.add_argument("--gps-start-time",
help="The gps start time of the data "
"(integer seconds)", type=int)
data_reading_group.add_argument("--gps-end-time",
help="The gps end time of the data "
"(integer seconds)", type=int)
data_reading_group.add_argument("--strain-high-pass", type=float,
help="High pass frequency")
data_reading_group.add_argument("--strain-low-pass", type=float,
help="Low pass frequency")
data_reading_group.add_argument("--pad-data", default=8,
help="Extra padding to remove highpass corruption "
"(integer seconds, default 8)", type=int)
data_reading_group.add_argument("--taper-data",
help="Taper ends of data to zero using the supplied length as a "
"window (integer seconds)", type=int, default=0)
data_reading_group.add_argument("--sample-rate", type=float,
help="The sample rate to use for h(t) generation (integer Hz)")
data_reading_group.add_argument("--channel-name", type=str,
help="The channel containing the gravitational strain data")
# Read from cache file
data_reading_group.add_argument("--frame-cache", type=str, nargs="+",
help="Cache file containing the frame locations.")
# Read from frame files
data_reading_group.add_argument("--frame-files",
type=str, nargs="+",
help="list of frame files")
# Read from hdf store file
data_reading_group.add_argument("--hdf-store",
type=str,
help="Store of time series data in hdf format")
# Use datafind to get frame files
data_reading_group.add_argument("--frame-type",
type=str,
metavar="S:TYPE",
help="(optional), replaces frame-files. Use datafind "
"to get the needed frame file(s) of this type "
"from site S.")
# Filter frame files by URL
data_reading_group.add_argument("--frame-sieve",
type=str,
help="(optional), Only use frame files where the "
"URL matches the regular expression given.")
# Generate gaussian noise with given psd
data_reading_group.add_argument("--fake-strain",
help="Name of model PSD for generating fake gaussian noise.",
choices=pycbc.psd.get_psd_model_list() + ['zeroNoise'])
data_reading_group.add_argument("--fake-strain-extra-args",
nargs='+', action=DictOptionAction,
metavar='PARAM:VALUE', default={}, type=float,
help="(optional) Extra arguments passed to "
"the PSD models.")
data_reading_group.add_argument("--fake-strain-seed", type=int, default=0,
help="Seed value for the generation of fake colored"
" gaussian noise")
data_reading_group.add_argument("--fake-strain-from-file",
help="File containing ASD for generating fake noise from it.")
data_reading_group.add_argument("--fake-strain-flow",
default=1.0, type=float,
help="Low frequency cutoff of the fake strain")
data_reading_group.add_argument("--fake-strain-filter-duration",
default=128.0, type=float,
help="Duration in seconds of the fake data coloring filter")
data_reading_group.add_argument("--fake-strain-sample-rate",
default=16384, type=float,
help="Sample rate of the fake data generation")
# Injection options
data_reading_group.add_argument("--injection-file", type=str,
help="(optional) Injection file containing parameters"
" of CBC signals to be added to the strain")
data_reading_group.add_argument("--sgburst-injection-file", type=str,
help="(optional) Injection file containing parameters"
"of sine-Gaussian burst signals to add to the strain")
data_reading_group.add_argument("--injection-scale-factor", type=float,
default=1,
help="Divide injections by this factor "
"before adding to the strain data")
data_reading_group.add_argument("--injection-sample-rate", type=float,
help="Sample rate to use for injections (integer Hz). "
"Typically similar to the strain data sample rate."
"If not provided, the strain sample rate will be "
"used")
data_reading_group.add_argument("--injection-f-ref", type=float,
help="Reference frequency in Hz for creating CBC "
"injections from an XML file")
data_reading_group.add_argument("--injection-f-final", type=float,
help="Override the f_final field of a CBC XML "
"injection file (frequency in Hz)")
# Gating options
data_reading_group.add_argument("--gating-file", type=str,
help="(optional) Text file of gating segments to apply."
" Format of each line is (all values in seconds):"
" gps_time zeros_half_width pad_half_width")
data_reading_group.add_argument('--autogating-threshold', type=float,
metavar='SIGMA',
help='If given, find and gate glitches '
'producing a deviation larger than '
'SIGMA in the whitened strain time '
'series.')
data_reading_group.add_argument('--autogating-max-iterations', type=int,
metavar='SIGMA', default=1,
help='If given, iteratively apply '
'autogating')
data_reading_group.add_argument('--autogating-cluster', type=float,
metavar='SECONDS', default=5.,
help='Length of clustering window for '
'detecting glitches for autogating.')
data_reading_group.add_argument('--autogating-width', type=float,
metavar='SECONDS', default=0.25,
help='Half-width of the gating window.')
data_reading_group.add_argument('--autogating-taper', type=float,
metavar='SECONDS', default=0.25,
help='Taper the strain before and after '
'each gating window over a duration '
'of SECONDS.')
data_reading_group.add_argument('--autogating-pad', type=float,
metavar='SECONDS', default=16,
help='Ignore the given length of whitened '
'strain at the ends of a segment, to '
'avoid filters ringing.')
data_reading_group.add_argument('--gating-method', type=str,
default='taper',
help='Choose the method for gating. '
'Default: `taper`',
choices=['hard', 'taper', 'paint'])
# Optional
data_reading_group.add_argument("--normalize-strain", type=float,
help="(optional) Divide frame data by constant.")
data_reading_group.add_argument("--zpk-z", type=float, nargs="+",
help="(optional) Zero-pole-gain (zpk) filter strain. "
"A list of zeros for transfer function")
data_reading_group.add_argument("--zpk-p", type=float, nargs="+",
help="(optional) Zero-pole-gain (zpk) filter strain. "
"A list of poles for transfer function")
data_reading_group.add_argument("--zpk-k", type=float,
help="(optional) Zero-pole-gain (zpk) filter strain. "
"Transfer function gain")
# Options to apply to subtract noise from a witness channel and known
# transfer function.
data_reading_group.add_argument("--witness-frame-type", type=str,
help="(optional), frame type which will be use to query the"
" witness channel data.")
data_reading_group.add_argument("--witness-tf-file", type=str,
help="an hdf file containing the transfer"
" functions and the associated channel names")
data_reading_group.add_argument("--witness-filter-length", type=float,
help="filter length in seconds for the transfer function")
return data_reading_group
# FIXME: This repeats almost all of the options above. Any nice way of reducing
# this?
[docs]
def insert_strain_option_group_multi_ifo(parser, gps_times=True):
"""
Adds the options used to call the pycbc.strain.from_cli function to an
optparser as an OptionGroup. This should be used if you
want to use these options in your code.
Parameters
-----------
parser : object
OptionParser instance.
gps_times : bool, optional
Include ``--gps-start-time`` and ``--gps-end-time`` options. Default
is True.
"""
data_reading_group_multi = parser.add_argument_group("Options for obtaining"
" h(t)",
"These options are used for generating h(t) either by "
"reading from a file or by generating it. This is only "
"needed if the PSD is to be estimated from the data, ie. "
"if the --psd-estimation option is given. This group "
"supports reading from multiple ifos simultaneously.")
# Required options
if gps_times:
data_reading_group_multi.add_argument(
"--gps-start-time", nargs='+', action=MultiDetOptionAction,
metavar='IFO:TIME', type=int,
help="The gps start time of the data (integer seconds)")
data_reading_group_multi.add_argument(
"--gps-end-time", nargs='+', action=MultiDetOptionAction,
metavar='IFO:TIME', type=int,
help="The gps end time of the data (integer seconds)")
data_reading_group_multi.add_argument("--strain-high-pass", nargs='+',
action=MultiDetOptionAction,
type=float, metavar='IFO:FREQUENCY',
help="High pass frequency")
data_reading_group_multi.add_argument("--strain-low-pass", nargs='+',
action=MultiDetOptionAction,
type=float, metavar='IFO:FREQUENCY',
help="Low pass frequency")
data_reading_group_multi.add_argument("--pad-data", nargs='+', default=8,
action=MultiDetOptionAction,
type=int, metavar='IFO:LENGTH',
help="Extra padding to remove highpass corruption "
"(integer seconds, default 8)")
data_reading_group_multi.add_argument("--taper-data", nargs='+',
action=MultiDetOptionAction,
type=int, default=0, metavar='IFO:LENGTH',
help="Taper ends of data to zero using the "
"supplied length as a window (integer seconds)")
data_reading_group_multi.add_argument("--sample-rate", type=float,
nargs='+',
action=MultiDetOptionAction, metavar='IFO:RATE',
help="The sample rate to use for h(t) generation "
" (integer Hz).")
data_reading_group_multi.add_argument("--channel-name", type=str, nargs='+',
action=MultiDetOptionActionSpecial,
metavar='IFO:CHANNEL',
help="The channel containing the gravitational "
"strain data")
# Read from cache file
data_reading_group_multi.add_argument("--frame-cache", type=str, nargs="+",
action=MultiDetOptionAppendAction,
metavar='IFO:FRAME_CACHE',
help="Cache file containing the frame locations.")
# Read from frame files
data_reading_group_multi.add_argument("--frame-files", type=str, nargs="+",
action=MultiDetOptionAppendAction,
metavar='IFO:FRAME_FILES',
help="list of frame files")
# Read from hdf store file
data_reading_group_multi.add_argument("--hdf-store", type=str, nargs='+',
action=MultiDetOptionAction,
metavar='IFO:HDF_STORE_FILE',
help="Store of time series data in hdf format")
# Use datafind to get frame files
data_reading_group_multi.add_argument("--frame-type", type=str, nargs="+",
action=MultiDetOptionActionSpecial,
metavar='IFO:FRAME_TYPE',
help="(optional) Replaces frame-files. "
"Use datafind to get the needed frame "
"file(s) of this type.")
# Filter frame files by URL
data_reading_group_multi.add_argument("--frame-sieve", type=str, nargs="+",
action=MultiDetOptionAction,
metavar='IFO:FRAME_SIEVE',
help="(optional), Only use frame files where the "
"URL matches the regular expression given.")
# Generate gaussian noise with given psd
data_reading_group_multi.add_argument("--fake-strain", type=str, nargs="+",
action=MultiDetOptionAction, metavar='IFO:CHOICE',
help="Name of model PSD for generating fake "
"gaussian noise. Choose from %s or zeroNoise" \
%((', ').join(pycbc.psd.get_lalsim_psd_list()),) )
data_reading_group_multi.add_argument("--fake-strain-extra-args",
nargs='+', action=MultiDetDictOptionAction,
metavar='DETECTOR:PARAM:VALUE', default={},
type=float, help="(optional) Extra arguments "
"passed to the PSD models.")
data_reading_group_multi.add_argument("--fake-strain-seed", type=int,
default=0, nargs="+", action=MultiDetOptionAction,
metavar='IFO:SEED',
help="Seed value for the generation of fake "
"colored gaussian noise")
data_reading_group_multi.add_argument("--fake-strain-from-file", nargs="+",
action=MultiDetOptionAction, metavar='IFO:FILE',
help="File containing ASD for generating fake "
"noise from it.")
data_reading_group_multi.add_argument("--fake-strain-flow",
default=1.0, type=float,
nargs="+", action=MultiDetOptionAction,
help="Low frequency cutoff of the fake strain")
data_reading_group_multi.add_argument("--fake-strain-filter-duration",
default=128.0, type=float,
nargs="+", action=MultiDetOptionAction,
help="Duration in seconds of the fake data coloring filter")
data_reading_group_multi.add_argument("--fake-strain-sample-rate",
default=16384, type=float,
nargs="+", action=MultiDetOptionAction,
help="Sample rate of the fake data generation")
# Injection options
data_reading_group_multi.add_argument("--injection-file", type=str,
nargs="+", action=MultiDetOptionAction,
metavar='IFO:FILE',
help="(optional) Injection file containing parameters"
"of CBC signals to be added to the strain")
data_reading_group_multi.add_argument("--sgburst-injection-file", type=str,
nargs="+", action=MultiDetOptionAction,
metavar='IFO:FILE',
help="(optional) Injection file containing parameters"
"of sine-Gaussian burst signals to add to the strain")
data_reading_group_multi.add_argument("--injection-scale-factor",
type=float, nargs="+", action=MultiDetOptionAction,
metavar="IFO:VAL", default=1.,
help="Divide injections by this factor "
"before adding to the strain data")
data_reading_group_multi.add_argument("--injection-sample-rate",
type=float, nargs="+", action=MultiDetOptionAction,
metavar="IFO:VAL",
help="Sample rate to use for injections (integer Hz). "
"Typically similar to the strain data sample rate."
"If not provided, the strain sample rate will be "
"used")
data_reading_group_multi.add_argument("--injection-f-ref", type=float,
action=MultiDetOptionAction, metavar='IFO:VALUE',
help="Reference frequency in Hz for creating CBC "
"injections from an XML file")
data_reading_group_multi.add_argument('--injection-f-final', type=float,
action=MultiDetOptionAction, metavar='IFO:VALUE',
help="Override the f_final field of a CBC XML "
"injection file (frequency in Hz)")
# Gating options
data_reading_group_multi.add_argument("--gating-file", nargs="+",
action=MultiDetOptionAction,
metavar='IFO:FILE',
help='(optional) Text file of gating segments to apply.'
' Format of each line (units s) :'
' gps_time zeros_half_width pad_half_width')
data_reading_group_multi.add_argument('--autogating-threshold', type=float,
nargs="+", action=MultiDetOptionAction,
metavar='IFO:SIGMA',
help='If given, find and gate glitches producing a '
'deviation larger than SIGMA in the whitened strain'
' time series')
data_reading_group_multi.add_argument('--autogating-max-iterations', type=int,
metavar='SIGMA', default=1,
help='If given, iteratively apply '
'autogating')
data_reading_group_multi.add_argument('--autogating-cluster', type=float,
nargs="+", action=MultiDetOptionAction,
metavar='IFO:SECONDS', default=5.,
help='Length of clustering window for '
'detecting glitches for autogating.')
data_reading_group_multi.add_argument('--autogating-width', type=float,
nargs="+", action=MultiDetOptionAction,
metavar='IFO:SECONDS', default=0.25,
help='Half-width of the gating window.')
data_reading_group_multi.add_argument('--autogating-taper', type=float,
nargs="+", action=MultiDetOptionAction,
metavar='IFO:SECONDS', default=0.25,
help='Taper the strain before and after '
'each gating window over a duration '
'of SECONDS.')
data_reading_group_multi.add_argument('--autogating-pad', type=float,
nargs="+", action=MultiDetOptionAction,
metavar='IFO:SECONDS', default=16,
help='Ignore the given length of whitened '
'strain at the ends of a segment, to '
'avoid filters ringing.')
data_reading_group_multi.add_argument('--gating-method', type=str,
nargs='+', action=MultiDetOptionAction,
default='taper',
help='Choose the method for gating. '
'Default: `taper`',
choices=['hard', 'taper', 'paint'])
# Optional
data_reading_group_multi.add_argument("--normalize-strain", type=float,
nargs="+", action=MultiDetOptionAction,
metavar='IFO:VALUE',
help="(optional) Divide frame data by constant.")
data_reading_group_multi.add_argument("--zpk-z", type=float,
nargs="+", action=MultiDetOptionAppendAction,
metavar='IFO:VALUE',
help="(optional) Zero-pole-gain (zpk) filter strain. "
"A list of zeros for transfer function")
data_reading_group_multi.add_argument("--zpk-p", type=float,
nargs="+", action=MultiDetOptionAppendAction,
metavar='IFO:VALUE',
help="(optional) Zero-pole-gain (zpk) filter strain. "
"A list of poles for transfer function")
data_reading_group_multi.add_argument("--zpk-k", type=float,
nargs="+", action=MultiDetOptionAppendAction,
metavar='IFO:VALUE',
help="(optional) Zero-pole-gain (zpk) filter strain. "
"Transfer function gain")
return data_reading_group_multi
ensure_one_opt_groups = []
ensure_one_opt_groups.append(['--frame-cache','--fake-strain',
'--fake-strain-from-file',
'--frame-files', '--frame-type',
'--hdf-store'])
required_opts_list = ['--gps-start-time', '--gps-end-time',
'--pad-data', '--sample-rate',
'--channel-name']
[docs]
def verify_strain_options(opts, parser):
"""Sanity check provided strain arguments.
Parses the strain data CLI options and verifies that they are consistent
and reasonable.
Parameters
----------
opt : object
Result of parsing the CLI with OptionParser, or any object with the
required attributes (gps-start-time, gps-end-time, strain-high-pass,
pad-data, sample-rate, frame-cache, channel-name, fake-strain,
fake-strain-seed).
parser : object
OptionParser instance.
"""
for opt_group in ensure_one_opt_groups:
ensure_one_opt(opts, parser, opt_group)
required_opts(opts, parser, required_opts_list)
[docs]
def verify_strain_options_multi_ifo(opts, parser, ifos):
"""Sanity check provided strain arguments.
Parses the strain data CLI options and verifies that they are consistent
and reasonable.
Parameters
----------
opt : object
Result of parsing the CLI with OptionParser, or any object with the
required attributes (gps-start-time, gps-end-time, strain-high-pass,
pad-data, sample-rate, frame-cache, channel-name, fake-strain,
fake-strain-seed).
parser : object
OptionParser instance.
ifos : list of strings
List of ifos for which to verify options for
"""
for ifo in ifos:
for opt_group in ensure_one_opt_groups:
ensure_one_opt_multi_ifo(opts, parser, ifo, opt_group)
required_opts_multi_ifo(opts, parser, ifo, required_opts_list)
[docs]
def gate_data(data, gate_params):
"""Apply a set of gating windows to a time series.
Each gating window is
defined by a central time, a given duration (centered on the given
time) to zero out, and a given duration of smooth tapering on each side of
the window. The window function used for tapering is a Tukey window.
Parameters
----------
data : TimeSeries
The time series to be gated.
gate_params : list
List of parameters for the gating windows. Each element should be a
list or tuple with 3 elements: the central time of the gating window,
the half-duration of the portion to zero out, and the duration of the
Tukey tapering on each side. All times in seconds. The total duration
of the data affected by one gating window is thus twice the second
parameter plus twice the third parameter.
Returns
-------
data: TimeSeries
The gated time series.
"""
def inverted_tukey(M, n_pad):
midlen = M - 2*n_pad
if midlen < 0:
raise ValueError("No zeros left after applying padding.")
padarr = 0.5*(1.+numpy.cos(numpy.pi*numpy.arange(n_pad)/n_pad))
return numpy.concatenate((padarr,numpy.zeros(midlen),padarr[::-1]))
sample_rate = 1./data.delta_t
temp = data.data
for glitch_time, glitch_width, pad_width in gate_params:
t_start = glitch_time - glitch_width - pad_width - data.start_time
t_end = glitch_time + glitch_width + pad_width - data.start_time
if t_start > data.duration or t_end < 0.:
continue # Skip gate segments that don't overlap
win_samples = int(2*sample_rate*(glitch_width+pad_width))
pad_samples = int(sample_rate*pad_width)
window = inverted_tukey(win_samples, pad_samples)
offset = int(t_start * sample_rate)
idx1 = max(0, -offset)
idx2 = min(len(window), len(data)-offset)
temp[idx1+offset:idx2+offset] *= window[idx1:idx2]
return data
[docs]
class StrainSegments(object):
""" Class for managing manipulation of strain data for the purpose of
matched filtering. This includes methods for segmenting and
conditioning.
"""
def __init__(self, strain, segment_length=None, segment_start_pad=0,
segment_end_pad=0, trigger_start=None, trigger_end=None,
filter_inj_only=False, injection_window=None,
allow_zero_padding=False):
""" Determine how to chop up the strain data into smaller segments
for analysis.
"""
self._fourier_segments = None
self.strain = strain
self.delta_t = strain.delta_t
self.sample_rate = strain.sample_rate
if segment_length:
seg_len = segment_length
else:
seg_len = strain.duration
self.delta_f = 1.0 / seg_len
self.time_len = int(seg_len * self.sample_rate)
self.freq_len = self.time_len // 2 + 1
seg_end_pad = segment_end_pad
seg_start_pad = segment_start_pad
if not trigger_start:
trigger_start = int(strain.start_time) + segment_start_pad
else:
if not allow_zero_padding:
min_start_time = int(strain.start_time) + segment_start_pad
else:
min_start_time = int(strain.start_time)
if trigger_start < min_start_time:
err_msg = "Trigger start time must be within analysable "
err_msg += "window. Asked to start from %d " %(trigger_start)
err_msg += "but can only analyse from %d." %(min_start_time)
raise ValueError(err_msg)
if not trigger_end:
trigger_end = int(strain.end_time) - segment_end_pad
else:
if not allow_zero_padding:
max_end_time = int(strain.end_time) - segment_end_pad
else:
max_end_time = int(strain.end_time)
if trigger_end > max_end_time:
err_msg = "Trigger end time must be within analysable "
err_msg += "window. Asked to end at %d " %(trigger_end)
err_msg += "but can only analyse to %d." %(max_end_time)
raise ValueError(err_msg)
throwaway_size = seg_start_pad + seg_end_pad
seg_width = seg_len - throwaway_size
# The amount of time we can actually analyze given the
# amount of padding that is needed
analyzable = trigger_end - trigger_start
data_start = (trigger_start - segment_start_pad) - \
int(strain.start_time)
data_end = trigger_end + segment_end_pad - int(strain.start_time)
data_dur = data_end - data_start
data_start = data_start * strain.sample_rate
data_end = data_end * strain.sample_rate
#number of segments we need to analyze this data
num_segs = int(numpy.ceil(float(analyzable) / float(seg_width)))
# The offset we will use between segments
seg_offset = int(numpy.ceil(analyzable / float(num_segs)))
self.segment_slices = []
self.analyze_slices = []
# Determine how to chop up the strain into smaller segments
for nseg in range(num_segs-1):
# boundaries for time slices into the strain
seg_start = int(data_start + (nseg*seg_offset) * strain.sample_rate)
seg_end = int(seg_start + seg_len * strain.sample_rate)
seg_slice = slice(seg_start, seg_end)
self.segment_slices.append(seg_slice)
# boundaries for the analyzable portion of the segment
ana_start = int(seg_start_pad * strain.sample_rate)
ana_end = int(ana_start + seg_offset * strain.sample_rate)
ana_slice = slice(ana_start, ana_end)
self.analyze_slices.append(ana_slice)
# The last segment takes up any integer boundary slop
seg_end = int(data_end)
seg_start = int(seg_end - seg_len * strain.sample_rate)
seg_slice = slice(seg_start, seg_end)
self.segment_slices.append(seg_slice)
remaining = (data_dur - ((num_segs - 1) * seg_offset + seg_start_pad))
ana_start = int((seg_len - remaining) * strain.sample_rate)
ana_end = int((seg_len - seg_end_pad) * strain.sample_rate)
ana_slice = slice(ana_start, ana_end)
self.analyze_slices.append(ana_slice)
self.full_segment_slices = copy.deepcopy(self.segment_slices)
#Remove segments that are outside trig start and end
segment_slices_red = []
analyze_slices_red = []
trig_start_idx = (trigger_start - int(strain.start_time)) * strain.sample_rate
trig_end_idx = (trigger_end - int(strain.start_time)) * strain.sample_rate
if filter_inj_only and hasattr(strain, 'injections'):
end_times = strain.injections.end_times()
end_times = [time for time in end_times if float(time) < trigger_end and float(time) > trigger_start]
inj_idx = [(float(time) - float(strain.start_time)) * strain.sample_rate for time in end_times]
for seg, ana in zip(self.segment_slices, self.analyze_slices):
start = ana.start
stop = ana.stop
cum_start = start + seg.start
cum_end = stop + seg.start
# adjust first segment
if trig_start_idx > cum_start:
start += (trig_start_idx - cum_start)
# adjust last segment
if trig_end_idx < cum_end:
stop -= (cum_end - trig_end_idx)
if filter_inj_only and hasattr(strain, 'injections'):
analyze_this = False
inj_window = strain.sample_rate * 8
for inj_id in inj_idx:
if inj_id < (cum_end + inj_window) and \
inj_id > (cum_start - inj_window):
analyze_this = True
if not analyze_this:
continue
if start < stop:
segment_slices_red.append(seg)
analyze_slices_red.append(slice(start, stop))
self.segment_slices = segment_slices_red
self.analyze_slices = analyze_slices_red
[docs]
def fourier_segments(self):
""" Return a list of the FFT'd segments.
Return the list of FrequencySeries. Additional properties are
added that describe the strain segment. The property 'analyze'
is a slice corresponding to the portion of the time domain equivalent
of the segment to analyze for triggers. The value 'cumulative_index'
indexes from the beginning of the original strain series.
"""
if not self._fourier_segments:
self._fourier_segments = []
for seg_slice, ana in zip(self.segment_slices, self.analyze_slices):
if seg_slice.start >= 0 and seg_slice.stop <= len(self.strain):
freq_seg = make_frequency_series(self.strain[seg_slice])
# Assume that we cannot have a case where we both zero-pad on
# both sides
elif seg_slice.start < 0:
strain_chunk = self.strain[:seg_slice.stop]
strain_chunk.prepend_zeros(-seg_slice.start)
freq_seg = make_frequency_series(strain_chunk)
elif seg_slice.stop > len(self.strain):
strain_chunk = self.strain[seg_slice.start:]
strain_chunk.append_zeros(seg_slice.stop - len(self.strain))
freq_seg = make_frequency_series(strain_chunk)
freq_seg.analyze = ana
freq_seg.cumulative_index = seg_slice.start + ana.start
freq_seg.seg_slice = seg_slice
self._fourier_segments.append(freq_seg)
return self._fourier_segments
[docs]
@classmethod
def from_cli(cls, opt, strain):
"""Calculate the segmentation of the strain data for analysis from
the command line options.
"""
return cls(strain, segment_length=opt.segment_length,
segment_start_pad=opt.segment_start_pad,
segment_end_pad=opt.segment_end_pad,
trigger_start=opt.trig_start_time,
trigger_end=opt.trig_end_time,
filter_inj_only=opt.filter_inj_only,
injection_window=opt.injection_window,
allow_zero_padding=opt.allow_zero_padding)
[docs]
@classmethod
def insert_segment_option_group(cls, parser):
segment_group = parser.add_argument_group(
"Options for segmenting the strain",
"These options are used to determine how to "
"segment the strain into smaller chunks, "
"and for determining the portion of each to "
"analyze for triggers. ")
segment_group.add_argument("--trig-start-time", type=int, default=0,
help="(optional) The gps time to start recording triggers")
segment_group.add_argument("--trig-end-time", type=int, default=0,
help="(optional) The gps time to stop recording triggers")
segment_group.add_argument("--segment-length", type=int,
help="The length of each strain segment in seconds.")
segment_group.add_argument("--segment-start-pad", type=int,
help="The time in seconds to ignore of the "
"beginning of each segment in seconds. ")
segment_group.add_argument("--segment-end-pad", type=int,
help="The time in seconds to ignore at the "
"end of each segment in seconds.")
segment_group.add_argument("--allow-zero-padding", action='store_true',
help="Allow for zero padding of data to "
"analyze requested times, if needed.")
# Injection optimization options
segment_group.add_argument("--filter-inj-only", action='store_true',
help="Analyze only segments that contain an injection.")
segment_group.add_argument("--injection-window", default=None,
type=float, help="""If using --filter-inj-only then
only search for injections within +/- injection
window of the injections's end time. This is useful
to speed up a coherent search or a search where we
initially filter at lower sample rate, and then
filter at full rate where needed. NOTE: Reverts to
full analysis if two injections are in the same
segment.""")
[docs]
@classmethod
def from_cli_single_ifo(cls, opt, strain, ifo):
"""Calculate the segmentation of the strain data for analysis from
the command line options.
"""
return cls(strain, segment_length=opt.segment_length[ifo],
segment_start_pad=opt.segment_start_pad[ifo],
segment_end_pad=opt.segment_end_pad[ifo],
trigger_start=opt.trig_start_time[ifo],
trigger_end=opt.trig_end_time[ifo],
filter_inj_only=opt.filter_inj_only,
allow_zero_padding=opt.allow_zero_padding)
[docs]
@classmethod
def from_cli_multi_ifos(cls, opt, strain_dict, ifos):
"""Calculate the segmentation of the strain data for analysis from
the command line options.
"""
strain_segments = {}
for ifo in ifos:
strain_segments[ifo] = cls.from_cli_single_ifo(
opt, strain_dict[ifo], ifo)
return strain_segments
[docs]
@classmethod
def insert_segment_option_group_multi_ifo(cls, parser):
segment_group = parser.add_argument_group(
"Options for segmenting the strain",
"These options are used to determine how to "
"segment the strain into smaller chunks, "
"and for determining the portion of each to "
"analyze for triggers. ")
segment_group.add_argument("--trig-start-time", type=int, default=0,
nargs='+', action=MultiDetOptionAction, metavar='IFO:TIME',
help="(optional) The gps time to start recording triggers")
segment_group.add_argument("--trig-end-time", type=int, default=0,
nargs='+', action=MultiDetOptionAction, metavar='IFO:TIME',
help="(optional) The gps time to stop recording triggers")
segment_group.add_argument("--segment-length", type=int,
nargs='+', action=MultiDetOptionAction,
metavar='IFO:LENGTH',
help="The length of each strain segment in seconds.")
segment_group.add_argument("--segment-start-pad", type=int,
nargs='+', action=MultiDetOptionAction, metavar='IFO:TIME',
help="The time in seconds to ignore of the "
"beginning of each segment in seconds. ")
segment_group.add_argument("--segment-end-pad", type=int,
nargs='+', action=MultiDetOptionAction, metavar='IFO:TIME',
help="The time in seconds to ignore at the "
"end of each segment in seconds.")
segment_group.add_argument("--allow-zero-padding", action='store_true',
help="Allow for zero padding of data to analyze "
"requested times, if needed.")
segment_group.add_argument("--filter-inj-only", action='store_true',
help="Analyze only segments that contain "
"an injection.")
required_opts_list = ['--segment-length',
'--segment-start-pad',
'--segment-end-pad',
]
[docs]
@classmethod
def verify_segment_options(cls, opt, parser):
required_opts(opt, parser, cls.required_opts_list)
[docs]
@classmethod
def verify_segment_options_multi_ifo(cls, opt, parser, ifos):
for ifo in ifos:
required_opts_multi_ifo(opt, parser, ifo, cls.required_opts_list)
[docs]
@functools.lru_cache(maxsize=500)
def create_memory_and_engine_for_class_based_fft(
npoints_time,
dtype,
delta_t=1,
ifft=False,
uid=0
):
""" Create memory and engine for class-based FFT/IFFT
Currently only supports R2C FFT / C2R IFFTs, but this could be expanded
if use-cases arise.
Parameters
----------
npoints_time : int
Number of time samples of the real input vector (or real output vector
if doing an IFFT).
dtype : np.dtype
The dtype for the real input vector (or real output vector if doing an
IFFT). np.float32 or np.float64 I think in all cases.
delta_t : float (default: 1)
delta_t of the real vector. If not given this will be set to 1, and we
will assume it is not needed in the returned TimeSeries/FrequencySeries
ifft : boolean (default: False)
By default will use the FFT class, set to true to use IFFT.
uid : int (default: 0)
Provide a unique identifier. This is used to provide a separate set
of memory in the cache, for instance if calling this from different
codes.
"""
npoints_freq = npoints_time // 2 + 1
delta_f_tmp = 1.0 / (npoints_time * delta_t)
vec = TimeSeries(
zeros(
npoints_time,
dtype=dtype
),
delta_t=delta_t,
copy=False
)
vectilde = FrequencySeries(
zeros(
npoints_freq,
dtype=complex_same_precision_as(vec)
),
delta_f=delta_f_tmp,
copy=False
)
if ifft:
fft_class = IFFT(vectilde, vec)
invec = vectilde
outvec = vec
else:
fft_class = FFT(vec, vectilde)
invec = vec
outvec = vectilde
return invec, outvec, fft_class
[docs]
def execute_cached_fft(invec_data, normalize_by_rate=True, ifft=False,
copy_output=True, uid=0):
""" Executes a cached FFT
Parameters
-----------
invec_data : Array
Array which will be used as input when fft_class is executed.
normalize_by_rate : boolean (optional, default:False)
If True, then normalize by delta_t (for an FFT) or delta_f (for an
IFFT).
ifft : boolean (optional, default:False)
If true assume this is an IFFT and multiply by delta_f not delta_t.
Will do nothing if normalize_by_rate is False.
copy_output : boolean (optional, default:True)
If True we will copy the output into a new array. This avoids the issue
that calling this function again might overwrite output. However, if
you know that the output array will not be used before this function
might be called again with the same length, then setting this to False
will provide some increase in efficiency. The uid can also be used to
help ensure that data doesn't get unintentionally overwritten!
uid : int (default: 0)
Provide a unique identifier. This is used to provide a separate set
of memory in the cache, for instance if calling this from different
codes.
"""
from pycbc.types import real_same_precision_as
if ifft:
npoints_time = (len(invec_data) - 1) * 2
else:
npoints_time = len(invec_data)
try:
delta_t = invec_data.delta_t
except AttributeError:
if not normalize_by_rate:
# Don't need this
delta_t = 1
else:
raise
dtype = real_same_precision_as(invec_data)
invec, outvec, fft_class = create_memory_and_engine_for_class_based_fft(
npoints_time,
dtype,
delta_t=delta_t,
ifft=ifft,
uid=uid
)
if invec_data is not None:
invec._data[:] = invec_data._data[:]
fft_class.execute()
if normalize_by_rate:
if ifft:
outvec._data *= invec._delta_f
else:
outvec._data *= invec._delta_t
if copy_output:
outvec = outvec.copy()
try:
outvec._epoch = invec_data._epoch
except AttributeError:
pass
return outvec
[docs]
def execute_cached_ifft(*args, **kwargs):
""" Executes a cached IFFT
Parameters
-----------
invec_data : Array
Array which will be used as input when fft_class is executed.
normalize_by_rate : boolean (optional, default:False)
If True, then normalize by delta_t (for an FFT) or delta_f (for an
IFFT).
copy_output : boolean (optional, default:True)
If True we will copy the output into a new array. This avoids the issue
that calling this function again might overwrite output. However, if
you know that the output array will not be used before this function
might be called again with the same length, then setting this to False
will provide some increase in efficiency. The uid can also be used to
help ensure that data doesn't get unintentionally overwritten!
uid : int (default: 0)
Provide a unique identifier. This is used to provide a separate set
of memory in the cache, for instance if calling this from different
codes.
"""
return execute_cached_fft(*args, **kwargs, ifft=True)
# If using caching we want output to be unique if called at different places
# (and if called from different modules/functions), these unique IDs acheive
# that. The numbers are not significant, only that they are unique.
STRAINBUFFER_UNIQUE_ID_1 = 236546845
STRAINBUFFER_UNIQUE_ID_2 = 778946541
STRAINBUFFER_UNIQUE_ID_3 = 665849947
[docs]
class StrainBuffer(pycbc.frame.DataBuffer):
def __init__(self, frame_src, channel_name, start_time,
max_buffer,
sample_rate,
low_frequency_cutoff=20,
highpass_frequency=15.0,
highpass_reduction=200.0,
highpass_bandwidth=5.0,
psd_samples=30,
psd_segment_length=4,
psd_inverse_length=3.5,
trim_padding=0.25,
autogating_threshold=None,
autogating_cluster=None,
autogating_pad=None,
autogating_width=None,
autogating_taper=None,
autogating_duration=None,
autogating_psd_segment_length=None,
autogating_psd_stride=None,
state_channel=None,
data_quality_channel=None,
idq_channel=None,
idq_state_channel=None,
idq_threshold=None,
dyn_range_fac=pycbc.DYN_RANGE_FAC,
psd_abort_difference=None,
psd_recalculate_difference=None,
force_update_cache=True,
increment_update_cache=None,
analyze_flags=None,
data_quality_flags=None,
dq_padding=0):
""" Class to produce overwhitened strain incrementally
Parameters
----------
frame_src: str of list of strings
Strings that indicate where to read from files from. This can be a
list of frame files, a glob, etc.
channel_name: str
Name of the channel to read from the frame files
start_time:
Time to start reading from.
max_buffer: int
Length of the strain buffer in seconds.
sample_rate: int, Optional
Rate in Hz to sample the data.
low_frequency_cutoff: {float, 20}, Optional
The low frequency cutoff to use for inverse spectrum truncation
highpass_frequency: {float, 15}, Optional
The frequency to apply a highpass filter at before downsampling.
highpass_reduction: {float, 200}, Optional
The amount of reduction to apply to the low frequencies.
highpass_bandwidth: {float, 5}, Optional
The width of the transition region for the highpass filter.
psd_samples: {int, 30}, Optional
The number of sample to use for psd estimation
psd_segment_length: {float, 4}, Optional
The number of seconds in each psd sample.
psd_inverse_length: {float, 3.5}, Optional
The length in seconds for fourier transform of the inverse of the
PSD to be truncated to.
trim_padding: {float, 0.25}, Optional
Amount of padding in seconds to give for truncated the overwhitened
data stream.
autogating_threshold: float, Optional
Sigma deviation required to cause autogating of data.
If None, no autogating is performed.
autogating_cluster: float, Optional
Seconds to cluster possible gating locations.
autogating_pad: float, Optional
Seconds of corrupted whitened strain to ignore when generating a gate.
autogating_width: float, Optional
Half-duration of the zeroed-out portion of autogates.
autogating_taper: float, Optional
Duration of taper on either side of the gating window in seconds.
autogating_duration: float, Optional
Amount of data in seconds to apply autogating on.
autogating_psd_segment_length: float, Optional
The length in seconds of each segment used to estimate the PSD with Welch's method.
autogating_psd_stride: float, Optional
The overlap in seconds between each segment used to estimate the PSD with Welch's method.
state_channel: {str, None}, Optional
Channel to use for state information about the strain
data_quality_channel: {str, None}, Optional
Channel to use for data quality information about the strain
idq_channel: {str, None}, Optional
Channel to use for idq timeseries
idq_state_channel : {str, None}, Optional
Channel containing information about usability of idq
idq_threshold : float, Optional
Threshold which triggers a veto if iDQ channel falls below this threshold
dyn_range_fac: {float, pycbc.DYN_RANGE_FAC}, Optional
Scale factor to apply to strain
psd_abort_difference: {float, None}, Optional
The relative change in the inspiral range from the previous PSD
estimate to trigger the data to be considered invalid.
psd_recalculate_difference: {float, None}, Optional
the relative change in the inspiral range from the previous PSD
to trigger a re-estimatoin of the PSD.
force_update_cache: {boolean, True}, Optional
Re-check the filesystem for frame files on every attempt to
read more data.
analyze_flags: list of strs
The flags that must be on to mark the current data as valid for
*any* use.
data_quality_flags: list of strs
The flags used to determine if to keep triggers.
dq_padding: {float, 0}, optional
Extra seconds to consider invalid before/after times with bad DQ.
increment_update_cache: {str, None}, Optional
Pattern to look for frame files in a GPS dependent directory. This
is an alternate to the forced updated of the frame cache, and
apptempts to predict the next frame file name without probing the
filesystem.
"""
super(StrainBuffer, self).__init__(frame_src, channel_name, start_time,
max_buffer=max_buffer,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache)
self.low_frequency_cutoff = low_frequency_cutoff
# Set up status buffers
self.analyze_flags = analyze_flags
self.data_quality_flags = data_quality_flags
self.state = None
self.dq = None
self.idq = None
self.dq_padding = dq_padding
# State channel
if state_channel is not None:
valid_mask = pycbc.frame.flag_names_to_bitmask(self.analyze_flags)
logger.info('State channel %s interpreted as bitmask %s = good',
state_channel, bin(valid_mask))
self.state = pycbc.frame.StatusBuffer(
frame_src,
state_channel, start_time,
max_buffer=max_buffer,
valid_mask=valid_mask,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache)
# low latency dq channel
if data_quality_channel is not None:
sb_kwargs = dict(max_buffer=max_buffer,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache)
if len(self.data_quality_flags) == 1 \
and self.data_quality_flags[0] == 'veto_nonzero':
sb_kwargs['valid_on_zero'] = True
logger.info('DQ channel %s interpreted as zero = good',
data_quality_channel)
else:
sb_kwargs['valid_mask'] = pycbc.frame.flag_names_to_bitmask(
self.data_quality_flags)
logger.info(
'DQ channel %s interpreted as bitmask %s = good',
data_quality_channel,
bin(sb_kwargs['valid_mask'])
)
self.dq = pycbc.frame.StatusBuffer(frame_src, data_quality_channel,
start_time, **sb_kwargs)
if idq_channel is not None:
if idq_state_channel is None:
raise ValueError(
'Each detector with an iDQ channel requires an iDQ state channel as well')
if idq_threshold is None:
raise ValueError(
'If an iDQ channel is provided, a veto threshold must also be provided')
self.idq = pycbc.frame.iDQBuffer(frame_src,
idq_channel,
idq_state_channel,
idq_threshold,
start_time,
max_buffer=max_buffer,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache)
self.highpass_frequency = highpass_frequency
self.highpass_reduction = highpass_reduction
self.highpass_bandwidth = highpass_bandwidth
self.autogating_threshold = autogating_threshold
self.autogating_cluster = autogating_cluster
self.autogating_pad = autogating_pad
self.autogating_width = autogating_width
self.autogating_taper = autogating_taper
self.autogating_duration = autogating_duration
self.autogating_psd_segment_length = autogating_psd_segment_length
self.autogating_psd_stride = autogating_psd_stride
self.gate_params = []
self.sample_rate = sample_rate
self.dyn_range_fac = dyn_range_fac
self.psd_abort_difference = psd_abort_difference
self.psd_recalculate_difference = psd_recalculate_difference
self.psd_segment_length = psd_segment_length
self.psd_samples = psd_samples
self.psd_inverse_length = psd_inverse_length
self.psd = None
self.psds = {}
strain_len = int(max_buffer * self.sample_rate)
self.strain = TimeSeries(zeros(strain_len, dtype=numpy.float32),
delta_t=1.0/self.sample_rate,
epoch=start_time-max_buffer)
# Determine the total number of corrupted samples for highpass
# and PSD over whitening
highpass_samples, self.beta = kaiserord(self.highpass_reduction,
self.highpass_bandwidth / self.raw_buffer.sample_rate * 2 * numpy.pi)
self.highpass_samples = int(highpass_samples / 2)
resample_corruption = 10 # If using the ldas method
self.factor = round(1.0 / self.raw_buffer.delta_t / self.sample_rate)
self.corruption = self.highpass_samples // self.factor + resample_corruption
self.psd_corruption = self.psd_inverse_length * self.sample_rate
self.total_corruption = self.corruption + self.psd_corruption
# Determine how much padding is needed after removing the parts
# associated with PSD over whitening and highpass filtering
self.trim_padding = int(trim_padding * self.sample_rate)
if self.trim_padding > self.total_corruption:
self.trim_padding = self.total_corruption
self.psd_duration = (psd_samples - 1) // 2 * psd_segment_length
self.reduced_pad = int(self.total_corruption - self.trim_padding)
self.segments = {}
# time to ignore output of frame (for initial buffering)
self.add_hard_count()
self.taper_immediate_strain = True
@property
def start_time(self):
""" Return the start time of the current valid segment of data """
return self.end_time - self.blocksize
@property
def end_time(self):
""" Return the end time of the current valid segment of data """
return float(self.strain.start_time + (len(self.strain) - self.total_corruption) / self.sample_rate)
[docs]
def add_hard_count(self):
""" Reset the countdown timer, so that we don't analyze data long enough
to generate a new PSD.
"""
self.wait_duration = int(numpy.ceil(self.total_corruption / self.sample_rate + self.psd_duration))
self.invalidate_psd()
[docs]
def invalidate_psd(self):
""" Make the current PSD invalid. A new one will be generated when
it is next required """
self.psd = None
self.psds = {}
[docs]
def recalculate_psd(self):
""" Recalculate the psd
"""
seg_len = int(self.sample_rate * self.psd_segment_length)
e = len(self.strain)
s = e - (self.psd_samples + 1) * seg_len // 2
psd = pycbc.psd.welch(self.strain[s:e], seg_len=seg_len, seg_stride=seg_len//2)
psd.dist = spa_distance(psd, 1.4, 1.4, self.low_frequency_cutoff) * pycbc.DYN_RANGE_FAC
# If the new psd is similar to the old one, don't replace it
if self.psd and self.psd_recalculate_difference:
if abs(self.psd.dist - psd.dist) / self.psd.dist < self.psd_recalculate_difference:
logger.info("Skipping recalculation of %s PSD, %s-%s",
self.detector, self.psd.dist, psd.dist)
return True
# If the new psd is *really* different than the old one, return an error
if self.psd and self.psd_abort_difference:
if abs(self.psd.dist - psd.dist) / self.psd.dist > self.psd_abort_difference:
logger.info("%s PSD is CRAZY, aborting!!!!, %s-%s",
self.detector, self.psd.dist, psd.dist)
self.psd = psd
self.psds = {}
return False
# If the new estimate replaces the current one, invalide the ineterpolate PSDs
self.psd = psd
self.psds = {}
logger.info("Recalculating %s PSD, %s", self.detector, psd.dist)
return True
[docs]
def check_psd_dist(self, min_dist, max_dist):
"""Check that the horizon distance of a detector is within a required
range. If so, return True, otherwise log a warning and return False.
"""
if self.psd is None:
# ignore check
return True
# Note that the distance can in principle be inf or nan, e.g. if h(t)
# is identically zero. The check must fail in those cases. Be careful
# with how the logic works out when comparing inf's or nan's!
good = self.psd.dist >= min_dist and self.psd.dist <= max_dist
if not good:
logger.info(
"%s PSD dist %s outside acceptable range [%s, %s]",
self.detector,
self.psd.dist,
min_dist,
max_dist
)
return good
[docs]
def overwhitened_data(self, delta_f):
""" Return overwhitened data
Parameters
----------
delta_f: float
The sample step to generate overwhitened frequency domain data for
Returns
-------
htilde: FrequencySeries
Overwhited strain data
"""
# we haven't already computed htilde for this delta_f
if delta_f not in self.segments:
buffer_length = int(1.0 / delta_f)
e = len(self.strain)
s = int(e - buffer_length * self.sample_rate - self.reduced_pad * 2)
# FFT the contents of self.strain[s:e] into fseries
fseries = execute_cached_fft(self.strain[s:e],
copy_output=False,
uid=STRAINBUFFER_UNIQUE_ID_1)
fseries._epoch = self.strain._epoch + s*self.strain.delta_t
# we haven't calculated a resample psd for this delta_f
if delta_f not in self.psds:
psdt = pycbc.psd.interpolate(self.psd, fseries.delta_f)
psdt = pycbc.psd.inverse_spectrum_truncation(psdt,
int(self.sample_rate * self.psd_inverse_length),
low_frequency_cutoff=self.low_frequency_cutoff)
psdt._delta_f = fseries.delta_f
psd = pycbc.psd.interpolate(self.psd, delta_f)
psd = pycbc.psd.inverse_spectrum_truncation(psd,
int(self.sample_rate * self.psd_inverse_length),
low_frequency_cutoff=self.low_frequency_cutoff)
psd.psdt = psdt
self.psds[delta_f] = psd
psd = self.psds[delta_f]
fseries /= psd.psdt
# trim ends of strain
if self.reduced_pad != 0:
# IFFT the contents of fseries into overwhite
overwhite = execute_cached_ifft(fseries,
copy_output=False,
uid=STRAINBUFFER_UNIQUE_ID_2)
overwhite2 = overwhite[self.reduced_pad:len(overwhite)-self.reduced_pad]
taper_window = self.trim_padding / 2.0 / overwhite.sample_rate
gate_params = [(overwhite2.start_time, 0., taper_window),
(overwhite2.end_time, 0., taper_window)]
gate_data(overwhite2, gate_params)
# FFT the contents of overwhite2 into fseries_trimmed
fseries_trimmed = execute_cached_fft(
overwhite2,
copy_output=True,
uid=STRAINBUFFER_UNIQUE_ID_3
)
fseries_trimmed.start_time = fseries.start_time + self.reduced_pad * self.strain.delta_t
else:
fseries_trimmed = fseries
fseries_trimmed.psd = psd
self.segments[delta_f] = fseries_trimmed
stilde = self.segments[delta_f]
return stilde
[docs]
def near_hwinj(self):
"""Check that the current set of triggers could be influenced by
a hardware injection.
"""
if not self.state:
return False
if not self.state.is_extent_valid(self.start_time, self.blocksize, pycbc.frame.NO_HWINJ):
return True
return False
[docs]
def null_advance_strain(self, blocksize):
""" Advance and insert zeros
Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
"""
sample_step = int(blocksize * self.sample_rate)
csize = sample_step + self.corruption * 2
self.strain.roll(-sample_step)
# We should roll this off at some point too...
self.strain[len(self.strain) - csize + self.corruption:] = 0
self.strain.start_time += blocksize
# The next time we need strain will need to be tapered
self.taper_immediate_strain = True
[docs]
def advance(self, blocksize, timeout=10):
"""Advanced buffer blocksize seconds.
Add blocksize seconds more to the buffer, push blocksize seconds
from the beginning.
Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
Returns
-------
status: boolean
Returns True if this block is analyzable.
"""
ts = super(StrainBuffer, self).attempt_advance(blocksize, timeout=timeout)
self.blocksize = blocksize
self.gate_params = []
# We have given up so there is no time series
if ts is None:
logger.info("%s frame is late, giving up", self.detector)
self.null_advance_strain(blocksize)
if self.state:
self.state.null_advance(blocksize)
if self.dq:
self.dq.null_advance(blocksize)
if self.idq:
self.idq.null_advance(blocksize)
return False
# We collected some data so we are closer to being able to analyze data
self.wait_duration -= blocksize
# If the data we got was invalid, reset the counter on how much to collect
# This behavior corresponds to how we handle CAT1 vetoes
if self.state and self.state.advance(blocksize) is False:
self.add_hard_count()
self.null_advance_strain(blocksize)
if self.dq:
self.dq.null_advance(blocksize)
if self.idq:
self.idq.null_advance(blocksize)
logger.info("%s time has invalid data, resetting buffer",
self.detector)
return False
# Also advance the dq vector and idq timeseries in lockstep
if self.dq:
self.dq.advance(blocksize)
if self.idq:
self.idq.advance(blocksize)
self.segments = {}
# only condition with the needed raw data so we can continuously add
# to the existing result
# Precondition
sample_step = int(blocksize * self.sample_rate)
csize = sample_step + self.corruption * 2
start = len(self.raw_buffer) - csize * self.factor
strain = self.raw_buffer[start:]
strain = pycbc.filter.highpass_fir(strain, self.highpass_frequency,
self.highpass_samples,
beta=self.beta)
strain = (strain * self.dyn_range_fac).astype(numpy.float32)
strain = pycbc.filter.resample_to_delta_t(strain,
1.0/self.sample_rate, method='ldas')
# remove corruption at beginning
strain = strain[self.corruption:]
# taper beginning if needed
if self.taper_immediate_strain:
logger.info("Tapering start of %s strain block", self.detector)
strain = gate_data(
strain, [(strain.start_time, 0., self.autogating_taper)])
self.taper_immediate_strain = False
# Stitch into continuous stream
self.strain.roll(-sample_step)
self.strain[len(self.strain) - csize + self.corruption:] = strain[:]
self.strain.start_time += blocksize
# apply gating if needed
if self.autogating_threshold is not None:
autogating_duration_length = self.autogating_duration * self.sample_rate
autogating_start_sample = int(len(self.strain) - autogating_duration_length)
glitch_times = detect_loud_glitches(
self.strain[autogating_start_sample:-self.corruption],
psd_duration=self.autogating_psd_segment_length, psd_stride=self.autogating_psd_stride,
threshold=self.autogating_threshold,
cluster_window=self.autogating_cluster,
low_freq_cutoff=self.highpass_frequency,
corrupt_time=self.autogating_pad)
if len(glitch_times) > 0:
logger.info('Autogating %s at %s', self.detector,
', '.join(['%.3f' % gt for gt in glitch_times]))
self.gate_params = \
[(gt, self.autogating_width, self.autogating_taper)
for gt in glitch_times]
self.strain = gate_data(self.strain, self.gate_params)
if self.psd is None and self.wait_duration <=0:
self.recalculate_psd()
return self.wait_duration <= 0
[docs]
@classmethod
def from_cli(cls, ifo, args):
"""Initialize a StrainBuffer object (data reader) for a particular
detector.
"""
state_channel = analyze_flags = None
if args.state_channel and ifo in args.state_channel \
and args.analyze_flags and ifo in args.analyze_flags:
state_channel = ':'.join([ifo, args.state_channel[ifo]])
analyze_flags = args.analyze_flags[ifo].split(',')
dq_channel = dq_flags = None
if args.data_quality_channel and ifo in args.data_quality_channel \
and args.data_quality_flags and ifo in args.data_quality_flags:
dq_channel = ':'.join([ifo, args.data_quality_channel[ifo]])
dq_flags = args.data_quality_flags[ifo].split(',')
idq_channel = None
if args.idq_channel and ifo in args.idq_channel:
idq_channel = ':'.join([ifo, args.idq_channel[ifo]])
idq_state_channel = None
if args.idq_state_channel and ifo in args.idq_state_channel:
idq_state_channel = ':'.join([ifo, args.idq_state_channel[ifo]])
if args.frame_type:
frame_src = pycbc.frame.frame_paths(
args.frame_type[ifo],
args.start_time,
args.end_time,
site=ifo[0]
)
else:
frame_src = [args.frame_src[ifo]]
strain_channel = ':'.join([ifo, args.channel_name[ifo]])
return cls(
frame_src,
strain_channel,
args.start_time,
max_buffer=args.max_length,
state_channel=state_channel,
data_quality_channel=dq_channel,
idq_channel=idq_channel,
idq_state_channel=idq_state_channel,
idq_threshold=args.idq_threshold,
sample_rate=args.sample_rate,
low_frequency_cutoff=args.low_frequency_cutoff,
highpass_frequency=args.highpass_frequency,
highpass_reduction=args.highpass_reduction,
highpass_bandwidth=args.highpass_bandwidth,
psd_samples=args.psd_samples,
trim_padding=args.trim_padding,
psd_segment_length=args.psd_segment_length,
psd_inverse_length=args.psd_inverse_length,
autogating_threshold=args.autogating_threshold,
autogating_cluster=args.autogating_cluster,
autogating_pad=args.autogating_pad,
autogating_width=args.autogating_width,
autogating_taper=args.autogating_taper,
autogating_duration=args.autogating_duration,
autogating_psd_segment_length=args.autogating_psd_segment_length,
autogating_psd_stride=args.autogating_psd_stride,
psd_abort_difference=args.psd_abort_difference,
psd_recalculate_difference=args.psd_recalculate_difference,
force_update_cache=args.force_update_cache,
increment_update_cache=args.increment_update_cache[ifo],
analyze_flags=analyze_flags,
data_quality_flags=dq_flags,
dq_padding=args.data_quality_padding
)