Source code for pycbc.workflow.dq

# Copyright (C) 2020 Max Trevor and Derek Davis
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

#
# =============================================================================
#
#                                   Preamble
#
# =============================================================================
#

import os
import logging
from ligo import segments
from pycbc.workflow.core import (FileList, Executable, Node,
                                 File, SegFile, make_analysis_dir)
from pycbc.workflow.datafind import setup_datafind_workflow

[docs]class PyCBCCalculateDQExecutable(Executable): current_retention_level = Executable.ALL_TRIGGERS
[docs] def create_node(self, segment, frames): start = int(segment[0]) end = int(segment[1]) node = Node(self) node.add_input_list_opt('--frame-files', frames) node.add_opt('--gps-start-time', start) node.add_opt('--gps-end-time', end) node.new_output_file_opt(segment, '.hdf', '--output-file') return node
[docs]class PyCBCRerankDQExecutable(Executable): current_retention_level = Executable.MERGED_TRIGGERS
[docs] def create_node(self, workflow, ifo, dq_type, dq_files, binned_rate_file): node = Node(self) node.add_opt('--dq-type', dq_type) node.add_opt('--ifo', ifo) node.add_input_list_opt('--input-file', dq_files) node.add_input_opt('--rate-file', binned_rate_file) node.new_output_file_opt(workflow.analysis_time, '.hdf', '--output-file') return node
[docs]class PyCBCBinTriggerRatesDQExecutable(Executable): current_retention_level = Executable.MERGED_TRIGGERS
[docs] def create_node(self, workflow, ifo, dq_files, trig_file, bank_file): node = Node(self) node.add_opt('--ifo', ifo) node.add_input_opt('--bank-file', bank_file) node.add_input_opt('--trig-file', trig_file) node.add_input_list_opt('--dq-file', dq_files) node.new_output_file_opt(workflow.analysis_time,'.hdf', '--output-file') return node
[docs]class PyCBCCalculateDQFlagExecutable(Executable): # current_retention_level = Executable.ALL_TRIGGERS current_retention_level = Executable.MERGED_TRIGGERS
[docs] def create_node(self, workflow, segment, dq_file, flag): node = Node(self) # Executable objects are initialized with ifo information start = int(segment[0]) end = int(segment[1]) node.add_opt('--ifo', self.ifo_string) node.add_opt('--flag', flag) node.add_opt('--gps-start-time', start) node.add_opt('--gps-end-time', end) node.add_input_opt('--dq-segments', dq_file) node.new_output_file_opt(segment, '.hdf', '--output-file') return node
[docs]def setup_dq_reranking(workflow, dq_label, insps, bank, segs, analyzable_file, dq_file, output_dir=None, tags=None): make_analysis_dir(output_dir) output = FileList() if tags: dq_tags = tags + [dq_label] else: dq_tags = [dq_label] dq_type = workflow.cp.get_opt_tags("workflow-data_quality", 'dq-type', [dq_label]) if dq_type == 'timeseries': if dq_label not in workflow.cp.get_subsections('workflow-datafind'): msg = """No workflow-datafind section with dq tag. Tags must be used in workflow-datafind sections " if more than one source of data is used. Strain data source must be tagged workflow-datafind-hoft. Consult the documentation for more info.""" raise ValueError(msg) dq_ifos = workflow.cp.get_opt_tags("workflow-data_quality", 'ifos', [dq_label]) dq_ifos = dq_ifos.split(',') dq_segs = {} dq_segs_for_file = {} for ifo in dq_ifos: dq_segs[ifo] = segs[ifo] dq_segs_for_file[ifo+':'+dq_label] = segs[ifo] dq_segs_file = SegFile.from_segment_list_dict(dq_label, dq_segs_for_file, extension='.xml', valid_segment=workflow.analysis_time, directory=output_dir) datafind_files, dq_file, dq_segs, dq_name = \ setup_datafind_workflow(workflow, dq_segs, "datafind_dq", seg_file=dq_segs_file, tags=dq_tags) for ifo in dq_ifos: ifo_insp = [insp for insp in insps if (insp.ifo == ifo)] assert len(ifo_insp)==1 ifo_insp = ifo_insp[0] dq_files = FileList() for seg in dq_segs[ifo]: seg_frames = datafind_files.find_all_output_in_range(ifo, seg) raw_exe = PyCBCCalculateDQExecutable(workflow.cp, 'calculate_dq', ifos=ifo, out_dir=output_dir, tags=dq_tags) raw_node = raw_exe.create_node(seg, seg_frames) workflow += raw_node dq_files += raw_node.output_files intermediate_exe = PyCBCBinTriggerRatesDQExecutable(workflow.cp, 'bin_trigger_rates_dq', ifos=ifo, out_dir=output_dir, tags=dq_tags) intermediate_node = intermediate_exe.create_node(workflow, ifo, dq_files, ifo_insp, bank) workflow += intermediate_node binned_rate_file = intermediate_node.output_file new_exe = PyCBCRerankDQExecutable(workflow.cp, 'rerank_dq', ifos=ifo, out_dir=output_dir, tags=dq_tags) new_node = new_exe.create_node(workflow, ifo, dq_label, dq_files, binned_rate_file) workflow += new_node output += new_node.output_files elif dq_type == 'flag': flag_str = workflow.cp.get_opt_tags("workflow-data_quality", 'flag-name', dq_tags) ifo = flag_str[:2] ifo_insp = [insp for insp in insps if (insp.ifo == ifo)] assert len(ifo_insp)==1 ifo_insp = ifo_insp[0] flag_name = flag_str logging.info("Creating job for flag %s", flag_name) dq_files = FileList() for seg in segs[ifo]: raw_exe = PyCBCCalculateDQFlagExecutable(workflow.cp, 'calculate_dqflag', ifos=ifo, out_dir=output_dir, tags=dq_tags) raw_node = raw_exe.create_node(workflow, seg, dq_file, flag_name) workflow += raw_node dq_files += raw_node.output_files intermediate_exe = PyCBCBinTriggerRatesDQExecutable(workflow.cp, 'bin_trigger_rates_dq', ifos=ifo, out_dir=output_dir, tags=dq_tags) intermediate_node = intermediate_exe.create_node(workflow, ifo, dq_files, ifo_insp, bank) workflow += intermediate_node binned_rate_file = intermediate_node.output_file new_exe = PyCBCRerankDQExecutable(workflow.cp, 'rerank_dq', ifos=ifo, out_dir=output_dir, tags=dq_tags) new_node = new_exe.create_node(workflow, ifo, dq_label, dq_files, binned_rate_file) workflow += new_node output += new_node.output_files else: msg = """Incorrect DQ type specified. Only valid DQ types are 'flag' and 'timeseries'. Consult the documentation for more info.""" raise ValueError(msg) return output