Source code for pycbc.inference.models.data_utils

# Copyright (C) 2018  Collin Capano
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""Utilities for loading data for models.
"""

import logging
from argparse import ArgumentParser
from time import sleep
import numpy
try:
    from mpi4py import MPI
except ImportError:
    MPI = None

from pycbc.types import MultiDetOptionAction
from pycbc.psd import (insert_psd_option_group_multi_ifo,
                       from_cli_multi_ifos as psd_from_cli_multi_ifos,
                       verify_psd_options_multi_ifo)
from pycbc import strain
from pycbc.strain import (gates_from_cli, psd_gates_from_cli,
                          apply_gates_to_td, apply_gates_to_fd,
                          verify_strain_options_multi_ifo)
from pycbc import dq


[docs]def strain_from_cli_multi_ifos(*args, **kwargs): """Wrapper around strain.from_cli_multi_ifos that tries a few times before quiting. When running in a parallel environment, multiple concurrent queries to the segment data base can cause time out errors. If that happens, this will sleep for a few seconds, then try again a few times before giving up. """ count = 0 while count < 3: try: return strain.from_cli_multi_ifos(*args, **kwargs) except RuntimeError as e: exception = e count += 1 sleep(10) # if get to here, we've tries 3 times and still got an error, so exit raise exception
# # ============================================================================= # # Utilities for gravitational-wave data # # ============================================================================= #
[docs]class NoValidDataError(Exception): """This should be raised if a continous segment of valid data could not be found. """ pass
[docs]def create_data_parser(): """Creates an argument parser for loading GW data.""" parser = ArgumentParser() # add data options parser.add_argument("--instruments", type=str, nargs="+", required=True, help="Instruments to analyze, eg. H1 L1.") parser.add_argument("--trigger-time", type=float, default=0., help="Reference GPS time (at geocenter) from which " "the (anlaysis|psd)-(start|end)-time options are " "measured. The integer seconds will be used. " "Default is 0; i.e., if not provided, " "the analysis and psd times should be in GPS " "seconds.") parser.add_argument("--analysis-start-time", type=int, required=True, nargs='+', action=MultiDetOptionAction, metavar='IFO:TIME', help="The start time to use for the analysis, " "measured with respect to the trigger-time. " "If psd-inverse-length is provided, the given " "start time will be padded by half that length " "to account for wrap-around effects.") parser.add_argument("--analysis-end-time", type=int, required=True, nargs='+', action=MultiDetOptionAction, metavar='IFO:TIME', help="The end time to use for the analysis, " "measured with respect to the trigger-time. " "If psd-inverse-length is provided, the given " "end time will be padded by half that length " "to account for wrap-around effects.") parser.add_argument("--psd-start-time", type=int, default=None, nargs='+', action=MultiDetOptionAction, metavar='IFO:TIME', help="Start time to use for PSD estimation, measured " "with respect to the trigger-time.") parser.add_argument("--psd-end-time", type=int, default=None, nargs='+', action=MultiDetOptionAction, metavar='IFO:TIME', help="End time to use for PSD estimation, measured " "with respect to the trigger-time.") parser.add_argument("--data-conditioning-low-freq", type=float, nargs="+", action=MultiDetOptionAction, metavar='IFO:FLOW', dest="low_frequency_cutoff", help="Low frequency cutoff of the data. Needed for " "PSD estimation and when creating fake strain. " "If not provided, will use the model's " "low-frequency-cutoff.") insert_psd_option_group_multi_ifo(parser) strain.insert_strain_option_group_multi_ifo(parser, gps_times=False) strain.add_gate_option_group(parser) # add arguments for dq dqgroup = parser.add_argument_group("Options for quering data quality " "(DQ)") dqgroup.add_argument('--dq-segment-name', default='DATA', help='The status flag to query for data quality. ' 'Default is "DATA".') dqgroup.add_argument('--dq-source', choices=['any', 'GWOSC', 'dqsegdb'], default='any', help='Where to look for DQ information. If "any" ' '(the default) will first try GWOSC, then ' 'dqsegdb.') dqgroup.add_argument('--dq-server', default='https://segments.ligo.org', help='The server to use for dqsegdb.') dqgroup.add_argument('--veto-definer', default=None, help='Path to a veto definer file that defines ' 'groups of flags, which themselves define a set ' 'of DQ segments.') return parser
[docs]def check_validtimes(detector, gps_start, gps_end, shift_to_valid=False, max_shift=None, segment_name='DATA', **kwargs): r"""Checks DQ server to see if the given times are in a valid segment. If the ``shift_to_valid`` flag is provided, the times will be shifted left or right to try to find a continous valid block nearby. The shifting starts by shifting the times left by 1 second. If that does not work, it shifts the times right by one second. This continues, increasing the shift time by 1 second, until a valid block could be found, or until the shift size exceeds ``max_shift``. If the given times are not in a continuous valid segment, or a valid block cannot be found nearby, a ``NoValidDataError`` is raised. Parameters ---------- detector : str The name of the detector to query; e.g., 'H1'. gps_start : int The GPS start time of the segment to query. gps_end : int The GPS end time of the segment to query. shift_to_valid : bool, optional If True, will try to shift the gps start and end times to the nearest continous valid segment of data. Default is False. max_shift : int, optional The maximum number of seconds to try to shift left or right to find a valid segment. Default is ``gps_end - gps_start``. segment_name : str, optional The status flag to query; passed to :py:func:`pycbc.dq.query_flag`. Default is "DATA". \**kwargs : All other keyword arguments are passed to :py:func:`pycbc.dq.query_flag`. Returns ------- use_start : int The start time to use. If ``shift_to_valid`` is True, this may differ from the given GPS start time. use_end : int The end time to use. If ``shift_to_valid`` is True, this may differ from the given GPS end time. """ # expand the times checked encase we need to shift if max_shift is None: max_shift = int(gps_end - gps_start) check_start = gps_start - max_shift check_end = gps_end + max_shift # if we're running in an mpi enviornment and we're not the parent process, # we'll wait before quering the segment database. This will result in # getting the segments from the cache, so as not to overload the database if MPI is not None and (MPI.COMM_WORLD.Get_size() > 1 and MPI.COMM_WORLD.Get_rank() != 0): # we'll wait for 2 minutes sleep(120) validsegs = dq.query_flag(detector, segment_name, check_start, check_end, cache=True, **kwargs) use_start = gps_start use_end = gps_end # shift if necessary if shift_to_valid: shiftsize = 1 while (use_start, use_end) not in validsegs and shiftsize < max_shift: # try shifting left use_start = gps_start - shiftsize use_end = gps_end - shiftsize if (use_start, use_end) not in validsegs: # try shifting right use_start = gps_start + shiftsize use_end = gps_end + shiftsize shiftsize += 1 # check that we have a valid range if (use_start, use_end) not in validsegs: raise NoValidDataError("Could not find a continous valid segment in " "in detector {}".format(detector)) return use_start, use_end
[docs]def detectors_with_valid_data(detectors, gps_start_times, gps_end_times, pad_data=None, err_on_missing_detectors=False, **kwargs): r"""Determines which detectors have valid data. Parameters ---------- detectors : list of str Names of the detector names to check. gps_start_times : dict Dictionary of detector name -> start time listing the GPS start times of the segment to check for each detector. gps_end_times : dict Dictionary of detector name -> end time listing the GPS end times of the segment to check for each detector. pad_data : dict, optional Dictionary of detector name -> pad time to add to the beginning/end of the GPS start/end times before checking. A pad time for every detector in ``detectors`` must be given. Default (None) is to apply no pad to the times. err_on_missing_detectors : bool, optional If True, a ``NoValidDataError`` will be raised if any detector does not have continous valid data in its requested segment. Otherwise, the detector will not be included in the returned list of detectors with valid data. Default is False. \**kwargs : All other keyword arguments are passed to ``check_validtimes``. Returns ------- dict : A dictionary of detector name -> valid times giving the detectors with valid data and their segments. If ``shift_to_valid`` was passed to ``check_validtimes`` this may not be the same as the input segments. If no valid times could be found for a detector (and ``err_on_missing_detectors`` is False), it will not be included in the returned dictionary. """ if pad_data is None: pad_data = {det: 0 for det in detectors} dets_with_data = {} for det in detectors: logging.info("Checking that %s has valid data in the requested " "segment", det) try: pad = pad_data[det] start, end = check_validtimes(det, gps_start_times[det]-pad, gps_end_times[det]+pad, **kwargs) dets_with_data[det] = (start+pad, end-pad) except NoValidDataError as e: if err_on_missing_detectors: raise e logging.warning("WARNING: Detector %s will not be used in " "the analysis, as it does not have " "continuous valid data that spans the " "segment [%d, %d).", det, gps_start_times[det]-pad, gps_end_times[det]+pad) return dets_with_data
[docs]def check_for_nans(strain_dict): """Checks if any data in a dictionary of strains has NaNs. If any NaNs are found, a ``ValueError`` is raised. Parameters ---------- strain_dict : dict Dictionary of detectors -> :py:class:`pycbc.types.timeseries.TimeSeries`. """ for det, ts in strain_dict.items(): if numpy.isnan(ts.numpy()).any(): raise ValueError("NaN found in strain from {}".format(det))
[docs]def data_opts_from_config(cp, section, filter_flow): """Loads data options from a section in a config file. Parameters ---------- cp : WorkflowConfigParser Config file to read. section : str The section to read. All options in the section will be loaded as-if they wre command-line arguments. filter_flow : dict Dictionary of detectors -> inner product low frequency cutoffs. If a `data-conditioning-low-freq` cutoff wasn't provided for any of the detectors, these values will be used. Otherwise, the data-conditioning-low-freq must be less than the inner product cutoffs. If any are not, a ``ValueError`` is raised. Returns ------- opts : parsed argparse.ArgumentParser An argument parser namespace that was constructed as if the options were specified on the command line. """ # convert the section options into a command-line options optstr = cp.section_to_cli(section) # create a fake parser to parse them parser = create_data_parser() # parse the options opts = parser.parse_args(optstr.split()) # figure out the times to use logging.info("Determining analysis times to use") opts.trigger_time = int(opts.trigger_time) gps_start = opts.analysis_start_time.copy() gps_end = opts.analysis_end_time.copy() for det in opts.instruments: gps_start[det] += opts.trigger_time gps_end[det] += opts.trigger_time if opts.psd_inverse_length[det] is not None: pad = int(numpy.ceil(opts.psd_inverse_length[det] / 2)) logging.info("Padding %s analysis start and end times by %d " "(= psd-inverse-length/2) seconds to " "account for PSD wrap around effects.", det, pad) else: pad = 0 gps_start[det] -= pad gps_end[det] += pad if opts.psd_start_time[det] is not None: opts.psd_start_time[det] += opts.trigger_time if opts.psd_end_time[det] is not None: opts.psd_end_time[det] += opts.trigger_time opts.gps_start_time = gps_start opts.gps_end_time = gps_end # check for the frequencies low_freq_cutoff = filter_flow.copy() if opts.low_frequency_cutoff: # add in any missing detectors low_freq_cutoff.update({det: opts.low_frequency_cutoff[det] for det in opts.instruments if opts.low_frequency_cutoff[det] is not None}) # make sure the data conditioning low frequency cutoff is < than # the matched filter cutoff if any(low_freq_cutoff[det] > filter_flow[det] for det in filter_flow): raise ValueError("data conditioning low frequency cutoff must " "be less than the filter low frequency " "cutoff") opts.low_frequency_cutoff = low_freq_cutoff # verify options are sane verify_psd_options_multi_ifo(opts, parser, opts.instruments) verify_strain_options_multi_ifo(opts, parser, opts.instruments) return opts
[docs]def data_from_cli(opts, check_for_valid_times=False, shift_psd_times_to_valid=False, err_on_missing_detectors=False): """Loads the data needed for a model from the given command-line options. Gates specifed on the command line are also applied. Parameters ---------- opts : ArgumentParser parsed args Argument options parsed from a command line string (the sort of thing returned by `parser.parse_args`). check_for_valid_times : bool, optional Check that valid data exists in the requested gps times. Default is False. shift_psd_times_to_valid : bool, optional If estimating the PSD from data, shift the PSD times to a valid segment if needed. Default is False. err_on_missing_detectors : bool, optional Raise a NoValidDataError if any detector does not have valid data. Otherwise, a warning is printed, and that detector is skipped. Returns ------- strain_dict : dict Dictionary of detectors -> time series strain. psd_strain_dict : dict or None If ``opts.psd_(start|end)_time`` were set, a dctionary of detectors -> time series data to use for PSD estimation. Otherwise, ``None``. """ # get gates to apply gates = gates_from_cli(opts) psd_gates = psd_gates_from_cli(opts) # get strain time series instruments = opts.instruments # validate times if check_for_valid_times: dets_with_data = detectors_with_valid_data( instruments, opts.gps_start_time, opts.gps_end_time, pad_data=opts.pad_data, err_on_missing_detectors=err_on_missing_detectors, shift_to_valid=False, segment_name=opts.dq_segment_name, source=opts.dq_source, server=opts.dq_server, veto_definer=opts.veto_definer) # reset instruments to only be those with valid data instruments = list(dets_with_data.keys()) strain_dict = strain_from_cli_multi_ifos(opts, instruments, precision="double") # apply gates if not waiting to overwhiten if not opts.gate_overwhitened: strain_dict = apply_gates_to_td(strain_dict, gates) # check that there aren't nans in the data check_for_nans(strain_dict) # get strain time series to use for PSD estimation # if user has not given the PSD time options then use same data as analysis if opts.psd_start_time and opts.psd_end_time: logging.info("Will generate a different time series for PSD " "estimation") if check_for_valid_times: psd_times = detectors_with_valid_data( instruments, opts.psd_start_time, opts.psd_end_time, pad_data=opts.pad_data, err_on_missing_detectors=err_on_missing_detectors, shift_to_valid=shift_psd_times_to_valid, segment_name=opts.dq_segment_name, source=opts.dq_source, server=opts.dq_server, veto_definer=opts.veto_definer) # remove detectors from the strain dict that did not have valid # times for PSD estimation for det in set(strain_dict.keys())-set(psd_times.keys()): _ = strain_dict.pop(det) # reset instruments to only be those with valid data instruments = list(psd_times.keys()) else: psd_times = {det: (opts.psd_start_time[det], opts.psd_end_time[det]) for det in instruments} psd_strain_dict = {} for det, (psd_start, psd_end) in psd_times.items(): opts.gps_start_time = psd_start opts.gps_end_time = psd_end psd_strain_dict.update( strain_from_cli_multi_ifos(opts, [det], precision="double")) # apply any gates logging.info("Applying gates to PSD data") psd_strain_dict = apply_gates_to_td(psd_strain_dict, psd_gates) # check that there aren't nans in the psd data check_for_nans(psd_strain_dict) elif opts.psd_start_time or opts.psd_end_time: raise ValueError("Must give psd-start-time and psd-end-time") else: psd_strain_dict = None # check that we have data left to analyze if instruments == []: raise NoValidDataError("No valid data could be found in any of the " "requested instruments.") return strain_dict, psd_strain_dict
[docs]def fd_data_from_strain_dict(opts, strain_dict, psd_strain_dict=None): """Converts a dictionary of time series to the frequency domain, and gets the PSDs. Parameters ---------- opts : ArgumentParser parsed args Argument options parsed from a command line string (the sort of thing returned by `parser.parse_args`). strain_dict : dict Dictionary of detectors -> time series data. psd_strain_dict : dict, optional Dictionary of detectors -> time series data to use for PSD estimation. If not provided, will use the ``strain_dict``. This is ignored if ``opts.psd_estimation`` is not set. See :py:func:`pycbc.psd.psd_from_cli_multi_ifos` for details. Returns ------- stilde_dict : dict Dictionary of detectors -> frequency series data. psd_dict : dict Dictionary of detectors -> frequency-domain PSDs. """ # FFT strain and save each of the length of the FFT, delta_f, and # low frequency cutoff to a dict stilde_dict = {} length_dict = {} delta_f_dict = {} for det, tsdata in strain_dict.items(): stilde_dict[det] = tsdata.to_frequencyseries() length_dict[det] = len(stilde_dict[det]) delta_f_dict[det] = stilde_dict[det].delta_f if psd_strain_dict is None: psd_strain_dict = strain_dict # get PSD as frequency series psd_dict = psd_from_cli_multi_ifos( opts, length_dict, delta_f_dict, opts.low_frequency_cutoff, list(psd_strain_dict.keys()), strain_dict=psd_strain_dict, precision="double") return stilde_dict, psd_dict
[docs]def gate_overwhitened_data(stilde_dict, psd_dict, gates): """Applies gates to overwhitened data. Parameters ---------- stilde_dict : dict Dictionary of detectors -> frequency series data to apply the gates to. psd_dict : dict Dictionary of detectors -> PSD to use for overwhitening. gates : dict Dictionary of detectors -> gates. Returns ------- dict : Dictionary of detectors -> frequency series data with the gates applied after overwhitening. The returned data is not overwhitened. """ logging.info("Applying gates to overwhitened data") # overwhiten the data out = {} for det in gates: out[det] = stilde_dict[det] / psd_dict[det] # now apply the gate out = apply_gates_to_fd(out, gates) # now unwhiten for det in gates: out[det] *= psd_dict[det] return out