# Copyright (C) 2013 Ian Harry
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
This library code contains functions and classes that are used to set up
and add jobs/nodes to a pycbc workflow. For details about pycbc.workflow see:
https://ldas-jobs.ligo.caltech.edu/~cbc/docs/pycbc/ahope.html
"""
import math, os
import lal
from ligo import segments
from pycbc.workflow.core import Executable, File, FileList, Node
[docs]
def int_gps_time_to_str(t):
"""Takes an integer GPS time, either given as int or lal.LIGOTimeGPS, and
converts it to a string. If a LIGOTimeGPS with nonzero decimal part is
given, raises a ValueError."""
int_t = int(t)
if abs(float(t - int_t)) > 0.:
raise ValueError('Need an integer GPS time, got %s' % str(t))
return str(int_t)
[docs]
def select_tmpltbank_class(curr_exe):
""" This function returns a class that is appropriate for setting up
template bank jobs within workflow.
Parameters
----------
curr_exe : string
The name of the executable to be used for generating template banks.
Returns
--------
exe_class : Sub-class of pycbc.workflow.core.Executable that holds utility
functions appropriate for the given executable. Instances of the class
('jobs') **must** have methods
* job.create_node()
and
* job.get_valid_times(ifo, )
"""
exe_to_class_map = {
'pycbc_geom_nonspinbank' : PyCBCTmpltbankExecutable,
'pycbc_aligned_stoch_bank': PyCBCTmpltbankExecutable
}
try:
return exe_to_class_map[curr_exe]
except KeyError:
raise NotImplementedError(
"No job class exists for executable %s, exiting" % curr_exe)
[docs]
def select_matchedfilter_class(curr_exe):
""" This function returns a class that is appropriate for setting up
matched-filtering jobs within workflow.
Parameters
----------
curr_exe : string
The name of the matched filter executable to be used.
Returns
--------
exe_class : Sub-class of pycbc.workflow.core.Executable that holds utility
functions appropriate for the given executable. Instances of the class
('jobs') **must** have methods
* job.create_node()
and
* job.get_valid_times(ifo, )
"""
exe_to_class_map = {
'pycbc_inspiral' : PyCBCInspiralExecutable,
'pycbc_inspiral_skymax' : PyCBCInspiralExecutable,
'pycbc_multi_inspiral' : PyCBCMultiInspiralExecutable,
}
try:
return exe_to_class_map[curr_exe]
except KeyError:
# also conceivable to introduce a default class??
raise NotImplementedError(
"No job class exists for executable %s, exiting" % curr_exe)
[docs]
def select_generic_executable(workflow, exe_tag):
""" Returns a class that is appropriate for setting up jobs to run executables
having specific tags in the workflow config.
Executables should not be "specialized" jobs fitting into one of the
select_XXX_class functions above, i.e. not a matched filter or template
bank job, which require extra setup.
Parameters
----------
workflow : pycbc.workflow.core.Workflow
The Workflow instance.
exe_tag : string
The name of the config section storing options for this executable and
the option giving the executable path in the [executables] section.
Returns
--------
exe_class : Sub-class of pycbc.workflow.core.Executable that holds utility
functions appropriate for the given executable. Instances of the class
('jobs') **must** have a method job.create_node()
"""
exe_path = workflow.cp.get("executables", exe_tag)
exe_name = os.path.basename(exe_path)
exe_to_class_map = {
'ligolw_add' : LigolwAddExecutable,
'lalapps_inspinj' : LalappsInspinjExecutable,
'pycbc_create_injections' : PycbcCreateInjectionsExecutable,
'pycbc_condition_strain' : PycbcConditionStrainExecutable
}
try:
return exe_to_class_map[exe_name]
except KeyError:
# Should we try some sort of default class??
raise NotImplementedError(
"No job class exists for executable %s, exiting" % exe_name)
[docs]
def sngl_ifo_job_setup(workflow, ifo, out_files, curr_exe_job, science_segs,
datafind_outs, parents=None,
allow_overlap=True):
""" This function sets up a set of single ifo jobs. A basic overview of how this
works is as follows:
* (1) Identify the length of data that each job needs to read in, and what
part of that data the job is valid for.
* START LOOPING OVER SCIENCE SEGMENTS
* (2) Identify how many jobs are needed (if any) to cover the given science
segment and the time shift between jobs. If no jobs continue.
* START LOOPING OVER JOBS
* (3) Identify the time that the given job should produce valid output (ie.
inspiral triggers) over.
* (4) Identify the data range that the job will need to read in to produce
the aforementioned valid output.
* (5) Identify all parents/inputs of the job.
* (6) Add the job to the workflow
* END LOOPING OVER JOBS
* END LOOPING OVER SCIENCE SEGMENTS
Parameters
-----------
workflow: pycbc.workflow.core.Workflow
An instance of the Workflow class that manages the constructed workflow.
ifo : string
The name of the ifo to set up the jobs for
out_files : pycbc.workflow.core.FileList
The FileList containing the list of jobs. Jobs will be appended
to this list, and it does not need to be empty when supplied.
curr_exe_job : Job
An instanced of the Job class that has a get_valid times method.
science_segs : ligo.segments.segmentlist
The list of times that the jobs should cover
datafind_outs : pycbc.workflow.core.FileList
The file list containing the datafind files.
parents : pycbc.workflow.core.FileList (optional, kwarg, default=None)
The FileList containing the list of jobs that are parents to
the one being set up.
allow_overlap : boolean (optional, kwarg, default = True)
If this is set the times that jobs are valid for will be allowed to
overlap. This may be desired for template banks which may have some
overlap in the times they cover. This may not be desired for inspiral
jobs, where you probably want triggers recorded by jobs to not overlap
at all.
Returns
--------
out_files : pycbc.workflow.core.FileList
A list of the files that will be generated by this step in the
workflow.
"""
########### (1) ############
# Get the times that can be analysed and needed data lengths
data_length, valid_chunk, valid_length = identify_needed_data(curr_exe_job)
exe_tags = curr_exe_job.tags
# Loop over science segments and set up jobs
for curr_seg in science_segs:
########### (2) ############
# Initialize the class that identifies how many jobs are needed and the
# shift between them.
segmenter = JobSegmenter(data_length, valid_chunk, valid_length,
curr_seg, curr_exe_job)
for job_num in range(segmenter.num_jobs):
############## (3) #############
# Figure out over what times this job will be valid for
job_valid_seg = segmenter.get_valid_times_for_job(job_num,
allow_overlap=allow_overlap)
############## (4) #############
# Get the data that this job should read in
job_data_seg = segmenter.get_data_times_for_job(job_num)
############# (5) ############
# Identify parents/inputs to the job
if parents:
# Find the set of files with the best overlap
curr_parent = parents.find_outputs_in_range(ifo, job_valid_seg,
useSplitLists=True)
if not curr_parent:
err_string = ("No parent jobs found overlapping %d to %d."
%(job_valid_seg[0], job_valid_seg[1]))
err_string += "\nThis is a bad error! Contact a developer."
raise ValueError(err_string)
else:
curr_parent = [None]
curr_dfouts = None
if datafind_outs:
curr_dfouts = datafind_outs.find_all_output_in_range(ifo,
job_data_seg, useSplitLists=True)
if not curr_dfouts:
err_str = ("No datafind jobs found overlapping %d to %d."
%(job_data_seg[0],job_data_seg[1]))
err_str += "\nThis shouldn't happen. Contact a developer."
raise ValueError(err_str)
############## (6) #############
# Make node and add to workflow
# Note if I have more than one curr_parent I need to make more than
# one job. If there are no curr_parents it is set to [None] and I
# make a single job. This catches the case of a split template bank
# where I run a number of jobs to cover a single range of time.
for parent in curr_parent:
if len(curr_parent) != 1:
bank_tag = [t for t in parent.tags if 'bank' in t.lower()]
curr_exe_job.update_current_tags(bank_tag + exe_tags)
# We should generate unique names automatically, but it is a
# pain until we can set the output names for all Executables
node = curr_exe_job.create_node(job_data_seg, job_valid_seg,
parent=parent,
df_parents=curr_dfouts)
workflow.add_node(node)
curr_out_files = node.output_files
# FIXME: Here we remove PSD files if they are coming through.
# This should be done in a better way. On to-do list.
curr_out_files = [i for i in curr_out_files if 'PSD_FILE'\
not in i.tags]
out_files += curr_out_files
return out_files
[docs]
def multi_ifo_coherent_job_setup(workflow, out_files, curr_exe_job,
science_segs, datafind_outs, output_dir,
parents=None, slide_dict=None, tags=None):
"""
Method for setting up coherent inspiral jobs.
"""
if tags is None:
tags = []
data_seg, job_valid_seg = curr_exe_job.get_valid_times()
curr_out_files = FileList([])
if 'IPN' in datafind_outs[-1].description \
and 'bank_veto_bank' in datafind_outs[-2].description:
# FIXME: This looks like a really nasty hack for the GRB code.
# This should be fixed properly to avoid strange behaviour!
ipn_sky_points = datafind_outs[-1]
bank_veto = datafind_outs[-2]
frame_files = datafind_outs[:-2]
else:
ipn_sky_points = None
if 'bank_veto_bank' in datafind_outs[-1].name:
bank_veto = datafind_outs[-1]
frame_files = datafind_outs[:-1]
else:
bank_veto = None
frame_files = datafind_outs
split_bank_counter = 0
if curr_exe_job.injection_file is None:
for split_bank in parents:
tag = list(tags)
tag.append(split_bank.tag_str)
node = curr_exe_job.create_node(data_seg, job_valid_seg,
parent=split_bank, dfParents=frame_files,
bankVetoBank=bank_veto, ipn_file=ipn_sky_points,
slide=slide_dict, tags=tag)
workflow.add_node(node)
split_bank_counter += 1
curr_out_files.extend(node.output_files)
else:
for inj_file in curr_exe_job.injection_file:
for split_bank in parents:
tag = list(tags)
tag.append(inj_file.tag_str)
tag.append(split_bank.tag_str)
node = curr_exe_job.create_node(data_seg, job_valid_seg,
parent=split_bank, inj_file=inj_file, tags=tag,
dfParents=frame_files, bankVetoBank=bank_veto,
ipn_file=ipn_sky_points)
workflow.add_node(node)
split_bank_counter += 1
curr_out_files.extend(node.output_files)
# FIXME: Here we remove PSD files if they are coming
# through. This should be done in a better way. On
# to-do list.
# IWHNOTE: This will not be needed when coh_PTF is retired, but it is
# okay to do this. It just means you can't access these files
# later.
curr_out_files = [i for i in curr_out_files if 'PSD_FILE'\
not in i.tags]
out_files += curr_out_files
return out_files
[docs]
def identify_needed_data(curr_exe_job):
""" This function will identify the length of data that a specific
executable needs to analyse and what part of that data is valid (ie.
inspiral doesn't analyse the first or last 8s of data it reads in).
Parameters
-----------
curr_exe_job : Job
An instance of the Job class that has a get_valid times method.
Returns
--------
dataLength : float
The amount of data (in seconds) that each instance of the job must read
in.
valid_chunk : ligo.segments.segment
The times within dataLength for which that jobs output **can** be
valid (ie. for inspiral this is (72, dataLength-72) as, for a standard
setup the inspiral job cannot look for triggers in the first 72 or
last 72 seconds of data read in.)
valid_length : float
The maximum length of data each job can be valid for. This is
abs(valid_segment).
"""
# Set up the condorJob class for the current executable
data_lengths, valid_chunks = curr_exe_job.get_valid_times()
# Begin by getting analysis start and end, and start and end of time
# that the output file is valid for
valid_lengths = [abs(valid_chunk) for valid_chunk in valid_chunks]
return data_lengths, valid_chunks, valid_lengths
[docs]
class JobSegmenter(object):
""" This class is used when running sngl_ifo_job_setup to determine what times
should be analysed be each job and what data is needed.
"""
def __init__(self, data_lengths, valid_chunks, valid_lengths, curr_seg,
curr_exe_class):
""" Initialize class. """
self.exe_class = curr_exe_class
self.curr_seg = curr_seg
self.curr_seg_length = float(abs(curr_seg))
self.data_length, self.valid_chunk, self.valid_length = \
self.pick_tile_size(self.curr_seg_length, data_lengths,
valid_chunks, valid_lengths)
self.data_chunk = segments.segment([0, self.data_length])
self.data_loss = self.data_length - abs(self.valid_chunk)
if self.data_loss < 0:
raise ValueError("pycbc.workflow.jobsetup needs fixing! Please contact a developer")
if self.curr_seg_length < self.data_length:
self.num_jobs = 0
return
# How many jobs do we need
self.num_jobs = int( math.ceil( (self.curr_seg_length \
- self.data_loss) / float(self.valid_length) ))
if self.curr_seg_length == self.data_length:
# If the segment length is identical to the data length then I
# will have exactly 1 job!
self.job_time_shift = 0
else:
# What is the incremental shift between jobs
self.job_time_shift = (self.curr_seg_length - self.data_length) / \
float(self.num_jobs - 1)
[docs]
def pick_tile_size(self, seg_size, data_lengths, valid_chunks, valid_lengths):
""" Choose job tiles size based on science segment length """
if len(valid_lengths) == 1:
return data_lengths[0], valid_chunks[0], valid_lengths[0]
else:
# Pick the tile size that is closest to 1/3 of the science segment
target_size = seg_size / 3
pick, pick_diff = 0, abs(valid_lengths[0] - target_size)
for i, size in enumerate(valid_lengths):
if abs(size - target_size) < pick_diff:
pick, pick_diff = i, abs(size - target_size)
return data_lengths[pick], valid_chunks[pick], valid_lengths[pick]
[docs]
def get_valid_times_for_job(self, num_job, allow_overlap=True):
""" Get the times for which this job is valid. """
# small factor of 0.0001 to avoid float round offs causing us to
# miss a second at end of segments.
shift_dur = self.curr_seg[0] + int(self.job_time_shift * num_job\
+ 0.0001)
job_valid_seg = self.valid_chunk.shift(shift_dur)
# If we need to recalculate the valid times to avoid overlap
if not allow_overlap:
data_per_job = (self.curr_seg_length - self.data_loss) / \
float(self.num_jobs)
lower_boundary = num_job*data_per_job + \
self.valid_chunk[0] + self.curr_seg[0]
upper_boundary = data_per_job + lower_boundary
# NOTE: Convert to int after calculating both boundaries
# small factor of 0.0001 to avoid float round offs causing us to
# miss a second at end of segments.
lower_boundary = int(lower_boundary)
upper_boundary = int(upper_boundary + 0.0001)
if lower_boundary < job_valid_seg[0] or \
upper_boundary > job_valid_seg[1]:
err_msg = ("Workflow is attempting to generate output "
"from a job at times where it is not valid.")
raise ValueError(err_msg)
job_valid_seg = segments.segment([lower_boundary,
upper_boundary])
return job_valid_seg
[docs]
def get_data_times_for_job(self, num_job):
""" Get the data that this job will read in. """
# small factor of 0.0001 to avoid float round offs causing us to
# miss a second at end of segments.
shift_dur = self.curr_seg[0] + int(self.job_time_shift * num_job\
+ 0.0001)
job_data_seg = self.data_chunk.shift(shift_dur)
# Sanity check that all data is used
if num_job == 0:
if job_data_seg[0] != self.curr_seg[0]:
err= "Job is not using data from the start of the "
err += "science segment. It should be using all data."
raise ValueError(err)
if num_job == (self.num_jobs - 1):
if job_data_seg[1] != self.curr_seg[1]:
err = "Job is not using data from the end of the "
err += "science segment. It should be using all data."
raise ValueError(err)
if hasattr(self.exe_class, 'zero_pad_data_extend'):
job_data_seg = self.exe_class.zero_pad_data_extend(job_data_seg,
self.curr_seg)
return job_data_seg
[docs]
class PyCBCInspiralExecutable(Executable):
""" The class used to create jobs for pycbc_inspiral Executable. """
current_retention_level = Executable.ALL_TRIGGERS
time_dependent_options = ['--channel-name']
def __init__(self, cp, exe_name, ifo=None, out_dir=None,
injection_file=None, tags=None, reuse_executable=False):
if tags is None:
tags = []
super().__init__(cp, exe_name, ifo, out_dir, tags=tags,
reuse_executable=reuse_executable,
set_submit_subdir=False)
self.cp = cp
self.injection_file = injection_file
self.ext = '.hdf'
self.num_threads = 1
if self.get_opt('processing-scheme') is not None:
stxt = self.get_opt('processing-scheme')
if len(stxt.split(':')) > 1:
self.num_threads = stxt.split(':')[1]
[docs]
def create_node(self, data_seg, valid_seg, parent=None, df_parents=None,
tags=None):
if tags is None:
tags = []
node = Node(self, valid_seg=valid_seg)
if not self.has_opt('pad-data'):
raise ValueError("The option pad-data is a required option of "
"%s. Please check the ini file." % self.name)
pad_data = int(self.get_opt('pad-data'))
# set remaining options flags
node.add_opt('--gps-start-time',
int_gps_time_to_str(data_seg[0] + pad_data))
node.add_opt('--gps-end-time',
int_gps_time_to_str(data_seg[1] - pad_data))
node.add_opt('--trig-start-time', int_gps_time_to_str(valid_seg[0]))
node.add_opt('--trig-end-time', int_gps_time_to_str(valid_seg[1]))
if self.injection_file is not None:
node.add_input_opt('--injection-file', self.injection_file)
# set the input and output files
fil = node.new_output_file_opt(
valid_seg,
self.ext,
'--output',
tags=tags,
store_file=self.retain_files,
use_tmp_subdirs=True
)
# For inspiral jobs we overrwrite the "relative.submit.dir"
# attribute to avoid too many files in one sub-directory
curr_rel_dir = fil.name.split('/')[0]
node.add_profile('pegasus', 'relative.submit.dir',
self.pegasus_name + '_' + curr_rel_dir)
# Must ensure this is not a LIGOGPS as JSON won't understand it
data_seg = segments.segment([int(data_seg[0]), int(data_seg[1])])
fil.add_metadata('data_seg', data_seg)
node.add_input_opt('--bank-file', parent)
if df_parents is not None:
node.add_input_list_opt('--frame-files', df_parents)
return node
[docs]
def get_valid_times(self):
""" Determine possible dimensions of needed input and valid output
"""
if self.cp.has_option('workflow-matchedfilter',
'min-analysis-segments'):
min_analysis_segs = int(self.cp.get('workflow-matchedfilter',
'min-analysis-segments'))
else:
min_analysis_segs = 0
if self.cp.has_option('workflow-matchedfilter',
'max-analysis-segments'):
max_analysis_segs = int(self.cp.get('workflow-matchedfilter',
'max-analysis-segments'))
else:
# Choose ridiculously large default value
max_analysis_segs = 1000
if self.cp.has_option('workflow-matchedfilter', 'min-analysis-length'):
min_analysis_length = int(self.cp.get('workflow-matchedfilter',
'min-analysis-length'))
else:
min_analysis_length = 0
if self.cp.has_option('workflow-matchedfilter', 'max-analysis-length'):
max_analysis_length = int(self.cp.get('workflow-matchedfilter',
'max-analysis-length'))
else:
# Choose a ridiculously large default value
max_analysis_length = 100000
segment_length = int(self.get_opt('segment-length'))
pad_data = 0
if self.has_opt('pad-data'):
pad_data += int(self.get_opt('pad-data'))
# NOTE: Currently the tapered data is ignored as it is short and
# will lie within the segment start/end pad. This means that
# the tapered data *will* be used for PSD estimation (but this
# effect should be small). It will also be in the data segments
# used for SNR generation (when in the middle of a data segment
# where zero-padding is not being used) but the templates should
# not be long enough to use this data assuming segment start/end
# pad take normal values. When using zero-padding this data will
# be used for SNR generation.
#if self.has_opt('taper-data'):
# pad_data += int(self.get_opt( 'taper-data' ))
if self.has_opt('allow-zero-padding'):
self.zero_padding=True
else:
self.zero_padding=False
start_pad = int(self.get_opt( 'segment-start-pad'))
end_pad = int(self.get_opt('segment-end-pad'))
seg_ranges = range(min_analysis_segs, max_analysis_segs + 1)
data_lengths = []
valid_regions = []
for nsegs in seg_ranges:
analysis_length = (segment_length - start_pad - end_pad) * nsegs
if not self.zero_padding:
data_length = analysis_length + pad_data * 2 \
+ start_pad + end_pad
start = pad_data + start_pad
end = data_length - pad_data - end_pad
else:
data_length = analysis_length + pad_data * 2
start = pad_data
end = data_length - pad_data
if data_length > max_analysis_length: continue
if data_length < min_analysis_length: continue
data_lengths += [data_length]
valid_regions += [segments.segment(start, end)]
# If min_analysis_length is given, ensure that it is added as an option
# for job analysis length.
if min_analysis_length:
data_length = min_analysis_length
if not self.zero_padding:
start = pad_data + start_pad
end = data_length - pad_data - end_pad
else:
start = pad_data
end = data_length - pad_data
if end > start:
data_lengths += [data_length]
valid_regions += [segments.segment(start, end)]
return data_lengths, valid_regions
[docs]
def zero_pad_data_extend(self, job_data_seg, curr_seg):
"""When using zero padding, *all* data is analysable, but the setup
functions must include the padding data where it is available so that
we are not zero-padding in the middle of science segments. This
function takes a job_data_seg, that is chosen for a particular node
and extends it with segment-start-pad and segment-end-pad if that
data is available.
"""
if self.zero_padding is False:
return job_data_seg
else:
start_pad = int(self.get_opt( 'segment-start-pad'))
end_pad = int(self.get_opt('segment-end-pad'))
new_data_start = max(curr_seg[0], job_data_seg[0] - start_pad)
new_data_end = min(curr_seg[1], job_data_seg[1] + end_pad)
new_data_seg = segments.segment([new_data_start, new_data_end])
return new_data_seg
# FIXME: This is probably misnamed, this is really GRBInspiralExectuable.
# There's nothing coherent here, it's just that data segment stuff is
# very different between GRB and all-sky/all-time
[docs]
class PyCBCMultiInspiralExecutable(Executable):
"""
The class responsible for setting up jobs for the
pycbc_multi_inspiral executable.
"""
current_retention_level = Executable.ALL_TRIGGERS
# bank-veto-bank-file is a file input option for pycbc_multi_inspiral
file_input_options = Executable.file_input_options + \
['--bank-veto-bank-file']
def __init__(self, cp, name, ifo=None, injection_file=None,
gate_files=None, out_dir=None, tags=None):
if tags is None:
tags = []
super().__init__(cp, name, ifo, out_dir=out_dir, tags=tags)
self.injection_file = injection_file
self.data_seg = segments.segment(int(cp.get('workflow', 'start-time')),
int(cp.get('workflow', 'end-time')))
self.num_threads = 1
[docs]
def create_node(self, data_seg, valid_seg, parent=None, inj_file=None,
dfParents=None, bankVetoBank=None, ipn_file=None,
slide=None, tags=None):
if tags is None:
tags = []
node = Node(self)
if not dfParents:
raise ValueError("%s must be supplied with frame or cache files"
% self.name)
# If doing single IFO search, make sure slides are disabled
if len(self.ifo_list) < 2 and \
(node.get_opt('--do-short-slides') is not None or \
node.get_opt('--short-slide-offset') is not None):
raise ValueError("Cannot run with time slides in a single IFO "
"configuration! Please edit your configuration "
"file accordingly.")
# Set instuments
node.add_opt("--instruments", " ".join(self.ifo_list))
pad_data = self.get_opt('pad-data')
if pad_data is None:
raise ValueError("The option pad-data is a required option of "
"%s. Please check the ini file." % self.name)
# Feed in bank_veto_bank.xml, if given
if self.cp.has_option('workflow-inspiral', 'bank-veto-bank-file'):
node.add_input_opt('--bank-veto-bank-file', bankVetoBank)
# Set time options
node.add_opt('--gps-start-time', data_seg[0] + int(pad_data))
node.add_opt('--gps-end-time', data_seg[1] - int(pad_data))
node.add_opt('--trig-start-time', valid_seg[0])
node.add_opt('--trig-end-time', valid_seg[1])
node.add_opt('--trigger-time', self.cp.get('workflow', 'trigger-time'))
# Set the input and output files
node.new_output_file_opt(data_seg, '.hdf', '--output',
tags=tags, store_file=self.retain_files)
node.add_input_opt('--bank-file', parent, )
if dfParents is not None:
frame_arg = '--frame-files'
for frame_file in dfParents:
frame_arg += f" {frame_file.ifo}:{frame_file.name}"
node.add_input(frame_file)
node.add_arg(frame_arg)
if ipn_file is not None:
node.add_input_opt('--sky-positions-file', ipn_file)
if inj_file is not None:
if self.get_opt('--do-short-slides') is not None or \
self.get_opt('--short-slide-offset') is not None:
raise ValueError("Cannot run with short slides in an "
"injection job. Please edit your "
"configuration file accordingly.")
node.add_input_opt('--injection-file', inj_file)
if slide is not None:
for ifo in self.ifo_list:
node.add_opt('--%s-slide-segment' % ifo.lower(), slide[ifo])
# Channels
channel_names = {}
for ifo in self.ifo_list:
channel_names[ifo] = self.cp.get_opt_tags(
"workflow", "%s-channel-name" % ifo.lower(), "")
channel_names_str = \
" ".join([val for key, val in channel_names.items()])
node.add_opt("--channel-name", channel_names_str)
return node
[docs]
def get_valid_times(self):
pad_data = int(self.get_opt('pad-data'))
if self.has_opt("segment-start-pad"):
pad_data = int(self.get_opt("pad-data"))
start_pad = int(self.get_opt("segment-start-pad"))
end_pad = int(self.get_opt("segment-end-pad"))
valid_start = self.data_seg[0] + pad_data + start_pad
valid_end = self.data_seg[1] - pad_data - end_pad
elif self.has_opt('analyse-segment-end'):
safety = 1
deadtime = int(self.get_opt('segment-length')) / 2
spec_len = int(self.get_opt('inverse-spec-length')) / 2
valid_start = (self.data_seg[0] + deadtime - spec_len + pad_data -
safety)
valid_end = self.data_seg[1] - spec_len - pad_data - safety
else:
overlap = int(self.get_opt('segment-length')) / 4
valid_start = self.data_seg[0] + overlap + pad_data
valid_end = self.data_seg[1] - overlap - pad_data
return self.data_seg, segments.segment(valid_start, valid_end)
[docs]
class PyCBCTmpltbankExecutable(Executable):
""" The class used to create jobs for pycbc_geom_nonspin_bank Executable and
any other Executables using the same command line option groups.
"""
current_retention_level = Executable.MERGED_TRIGGERS
def __init__(self, cp, exe_name, ifo=None, out_dir=None,
tags=None, write_psd=False, psd_files=None):
if tags is None:
tags = []
super().__init__(cp, exe_name, ifo, out_dir, tags=tags)
self.cp = cp
self.write_psd = write_psd
self.psd_files = psd_files
[docs]
def create_node(self, data_seg, valid_seg, parent=None, df_parents=None, tags=None):
if tags is None:
tags = []
node = Node(self)
if not df_parents:
raise ValueError("%s must be supplied with data file(s)"
% self.name)
pad_data = int(self.get_opt('pad-data'))
if pad_data is None:
raise ValueError("The option pad-data is a required option of "
"%s. Please check the ini file." % self.name)
# set the remaining option flags
node.add_opt('--gps-start-time',
int_gps_time_to_str(data_seg[0] + pad_data))
node.add_opt('--gps-end-time',
int_gps_time_to_str(data_seg[1] - pad_data))
# set the input and output files
# Add the PSD file if needed
if self.write_psd:
node.new_output_file_opt(valid_seg, '.txt', '--psd-output',
tags=tags+['PSD_FILE'], store_file=self.retain_files)
node.new_output_file_opt(valid_seg, '.xml.gz', '--output-file',
tags=tags, store_file=self.retain_files)
node.add_input_list_opt('--frame-files', df_parents)
return node
[docs]
def create_nodata_node(self, valid_seg, tags=None):
""" A simplified version of create_node that creates a node that does
not need to read in data.
Parameters
-----------
valid_seg : ligo.segments.segment
The segment over which to declare the node valid. Usually this
would be the duration of the analysis.
Returns
--------
node : pycbc.workflow.core.Node
The instance corresponding to the created node.
"""
if tags is None:
tags = []
node = Node(self)
# Set the output file
# Add the PSD file if needed
if self.write_psd:
node.new_output_file_opt(valid_seg, '.txt', '--psd-output',
tags=tags+['PSD_FILE'],
store_file=self.retain_files)
node.new_output_file_opt(valid_seg, '.xml.gz', '--output-file',
store_file=self.retain_files)
if self.psd_files is not None:
should_add = False
# If any of the ifos for this job are in the set
# of ifos for which a static psd was provided.
for ifo in self.ifo_list:
for psd_file in self.psd_files:
if ifo in psd_file.ifo_list:
should_add = True
if should_add:
node.add_input_opt('--psd-file', psd_file)
return node
[docs]
def get_valid_times(self):
pad_data = int(self.get_opt( 'pad-data'))
analysis_length = int(self.cp.get('workflow-tmpltbank',
'analysis-length'))
data_length = analysis_length + pad_data * 2
start = pad_data
end = data_length - pad_data
return [data_length], [segments.segment(start, end)]
[docs]
class LigolwAddExecutable(Executable):
""" The class used to create nodes for the ligolw_add Executable. """
current_retention_level = Executable.INTERMEDIATE_PRODUCT
[docs]
def create_node(self, jobSegment, input_files, output=None,
use_tmp_subdirs=True, tags=None):
if tags is None:
tags = []
node = Node(self)
# Very few options to ligolw_add, all input files are given as a long
# argument list. If this becomes unwieldy we could dump all these files
# to a cache file and read that in. ALL INPUT FILES MUST BE LISTED AS
# INPUTS (with .add_input_opt_file) IF THIS IS DONE THOUGH!
for fil in input_files:
node.add_input_arg(fil)
if output:
node.add_output_opt('--output', output)
else:
node.new_output_file_opt(jobSegment, '.xml.gz', '--output',
tags=tags, store_file=self.retain_files,
use_tmp_subdirs=use_tmp_subdirs)
return node
[docs]
class PycbcSplitInspinjExecutable(Executable):
"""
The class responsible for running the pycbc_split_inspinj executable
"""
current_retention_level = Executable.INTERMEDIATE_PRODUCT
def __init__(self, cp, exe_name, num_splits, ifo=None, out_dir=None):
super().__init__(cp, exe_name, ifo, out_dir, tags=[])
self.num_splits = int(num_splits)
[docs]
def create_node(self, parent, tags=None):
if tags is None:
tags = []
node = Node(self)
node.add_input_opt('--input-file', parent)
if parent.name.endswith("gz"):
ext = ".xml.gz"
else:
ext = ".xml"
out_files = FileList([])
for i in range(self.num_splits):
curr_tag = 'split%d' % i
curr_tags = parent.tags + [curr_tag]
job_tag = parent.description + "_" + self.name.upper()
out_file = File(parent.ifo_list, job_tag, parent.segment,
extension=ext, directory=self.out_dir,
tags=curr_tags, store_file=self.retain_files)
out_files.append(out_file)
node.add_output_list_opt('--output-files', out_files)
return node
[docs]
class LalappsInspinjExecutable(Executable):
"""
The class used to create jobs for the lalapps_inspinj Executable.
"""
current_retention_level = Executable.FINAL_RESULT
extension = '.xml'
[docs]
def create_node(self, segment, exttrig_file=None, tags=None):
if tags is None:
tags = []
node = Node(self)
curr_tags = self.tags + tags
# This allows the desired number of injections to be given explicitly
# in the config file. Used for coh_PTF as segment length is unknown
# before run time.
if self.get_opt('write-compress') is not None:
self.extension = '.xml.gz'
# Check if these injections are using trigger information to choose
# sky positions for the simulated signals
if (self.get_opt('l-distr') == 'exttrig' and exttrig_file is not None \
and 'trigger' in exttrig_file.description):
# Use an XML file containing trigger information
triggered = True
node.add_input_opt('--exttrig-file', exttrig_file)
elif (self.get_opt('l-distr') == 'ipn' and exttrig_file is not None \
and 'IPN' in exttrig_file.description):
# Use an IPN sky points file
triggered = True
node.add_input_opt('--ipn-file', exttrig_file)
elif (self.get_opt('l-distr') != 'exttrig') \
and (self.get_opt('l-distr') != 'ipn' and not \
self.has_opt('ipn-file')):
# Use no trigger information for generating injections
triggered = False
else:
err_msg = "The argument 'l-distr' passed to the "
err_msg += "%s job has the value " % self.tagged_name
err_msg += "'%s' but you have not " % self.get_opt('l-distr')
err_msg += "provided the corresponding ExtTrig or IPN file. "
err_msg += "Please check your configuration files and try again."
raise ValueError(err_msg)
if triggered:
num_injs = int(self.cp.get_opt_tags('workflow-injections',
'num-injs', curr_tags))
inj_tspace = float(segment[1] - segment[0]) / num_injs
node.add_opt('--time-interval', inj_tspace)
node.add_opt('--time-step', inj_tspace)
node.new_output_file_opt(segment, self.extension, '--output',
store_file=self.retain_files)
node.add_opt('--gps-start-time', int_gps_time_to_str(segment[0]))
node.add_opt('--gps-end-time', int_gps_time_to_str(segment[1]))
return node
[docs]
class PycbcSplitBankExecutable(Executable):
""" The class responsible for creating jobs for pycbc_hdf5_splitbank. """
extension = '.hdf'
current_retention_level = Executable.ALL_TRIGGERS
def __init__(self, cp, exe_name, num_banks,
ifo=None, out_dir=None):
super().__init__(cp, exe_name, ifo, out_dir, tags=[])
self.num_banks = int(num_banks)
[docs]
def create_node(self, bank, tags=None):
"""
Set up a CondorDagmanNode class to run splitbank code
Parameters
----------
bank : pycbc.workflow.core.File
The File containing the template bank to be split
Returns
--------
node : pycbc.workflow.core.Node
The node to run the job
"""
if tags is None:
tags = []
node = Node(self)
node.add_input_opt('--bank-file', bank)
# Get the output (taken from inspiral.py)
out_files = FileList([])
for i in range( 0, self.num_banks):
curr_tag = 'bank%d' %(i)
# FIXME: What should the tags actually be? The job.tags values are
# currently ignored.
curr_tags = bank.tags + [curr_tag] + tags
job_tag = bank.description + "_" + self.name.upper()
out_file = File(bank.ifo_list, job_tag, bank.segment,
extension=self.extension, directory=self.out_dir,
tags=curr_tags, store_file=self.retain_files)
out_files.append(out_file)
node.add_output_list_opt('--output-filenames', out_files)
return node
[docs]
class PycbcSplitBankXmlExecutable(PycbcSplitBankExecutable):
""" Subclass resonsible for creating jobs for pycbc_splitbank. """
extension='.xml.gz'
[docs]
class PycbcConditionStrainExecutable(Executable):
""" The class responsible for creating jobs for pycbc_condition_strain. """
current_retention_level = Executable.ALL_TRIGGERS
[docs]
def create_node(self, input_files, tags=None):
if tags is None:
tags = []
node = Node(self)
start_time = self.cp.get("workflow", "start-time")
end_time = self.cp.get("workflow", "end-time")
node.add_opt('--gps-start-time', start_time)
node.add_opt('--gps-end-time', end_time)
node.add_input_list_opt('--frame-files', input_files)
out_file = File(self.ifo, "gated",
segments.segment(int(start_time), int(end_time)),
directory=self.out_dir, store_file=self.retain_files,
extension=input_files[0].name.split('.', 1)[-1],
tags=tags)
node.add_output_opt('--output-strain-file', out_file)
out_gates_file = File(self.ifo, "output_gates",
segments.segment(int(start_time), int(end_time)),
directory=self.out_dir, extension='txt',
store_file=self.retain_files, tags=tags)
node.add_output_opt('--output-gates-file', out_gates_file)
return node, out_file
[docs]
class PycbcCreateInjectionsExecutable(Executable):
""" The class responsible for creating jobs
for ``pycbc_create_injections``.
"""
current_retention_level = Executable.ALL_TRIGGERS
extension = '.hdf'
[docs]
def create_node(self, config_files=None, seed=None, tags=None):
""" Set up a CondorDagmanNode class to run ``pycbc_create_injections``.
Parameters
----------
config_files : pycbc.workflow.core.FileList
A ``pycbc.workflow.core.FileList`` for injection configuration
files to be used with ``--config-files`` option.
seed : int
Seed to use for generating injections.
tags : list
A list of tags to include in filenames.
Returns
--------
node : pycbc.workflow.core.Node
The node to run the job.
"""
# default for tags is empty list
tags = [] if tags is None else tags
# get analysis start and end time
start_time = self.cp.get("workflow", "start-time")
end_time = self.cp.get("workflow", "end-time")
analysis_time = segments.segment(int(start_time), int(end_time))
# make node for running executable
node = Node(self)
if config_files is not None:
node.add_input_list_opt("--config-files", config_files)
if seed:
node.add_opt("--seed", seed)
injection_file = node.new_output_file_opt(analysis_time,
self.extension,
"--output-file",
tags=tags)
return node, injection_file
[docs]
class PycbcInferenceExecutable(Executable):
""" The class responsible for creating jobs for ``pycbc_inference``.
"""
current_retention_level = Executable.ALL_TRIGGERS
[docs]
def create_node(self, config_file, seed=None, tags=None,
analysis_time=None):
""" Set up a pegasus.Node instance to run ``pycbc_inference``.
Parameters
----------
config_file : pycbc.workflow.core.File
A ``pycbc.workflow.core.File`` for inference configuration file
to be used with ``--config-files`` option.
seed : int
An ``int`` to be used with ``--seed`` option.
tags : list
A list of tags to include in filenames.
Returns
--------
node : pycbc.workflow.core.Node
The node to run the job.
"""
# default for tags is empty list
tags = [] if tags is None else tags
# if analysis time not provided, try to get it from the config file
if analysis_time is None:
start_time = self.cp.get("workflow", "start-time")
end_time = self.cp.get("workflow", "end-time")
analysis_time = segments.segment(int(start_time), int(end_time))
# make node for running executable
node = Node(self)
node.add_input_opt("--config-file", config_file)
if seed is not None:
node.add_opt("--seed", seed)
inference_file = node.new_output_file_opt(analysis_time,
".hdf", "--output-file",
tags=tags)
if self.cp.has_option("pegasus_profile-inference",
"condor|+CheckpointSig"):
err_msg = "This is not yet supported/tested with pegasus 5. "
err_msg += "Please reimplement this (with unittest :-) )."
raise ValueError(err_msg)
#ckpt_file_name = "{}.checkpoint".format(inference_file.name)
#ckpt_file = dax.File(ckpt_file_name)
# DO NOT call pegasus API stuff outside of
# pegasus_workflow.py.
#node._dax_node.uses(ckpt_file, link=dax.Link.OUTPUT,
# register=False, transfer=False)
return node, inference_file
[docs]
class PycbcHDFSplitInjExecutable(Executable):
""" The class responsible for creating jobs for ``pycbc_hdf_splitinj``.
"""
current_retention_level = Executable.ALL_TRIGGERS
def __init__(self, cp, exe_name, num_splits, ifo=None, out_dir=None):
super().__init__(cp, exe_name, ifo, out_dir, tags=[])
self.num_splits = int(num_splits)
[docs]
def create_node(self, parent, tags=None):
if tags is None:
tags = []
node = Node(self)
node.add_input_opt('--input-file', parent)
out_files = FileList([])
for i in range(self.num_splits):
curr_tag = 'split%d' % i
curr_tags = parent.tags + [curr_tag]
job_tag = parent.description + "_" + self.name.upper()
out_file = File(parent.ifo_list, job_tag, parent.segment,
extension='.hdf', directory=self.out_dir,
tags=curr_tags, store_file=self.retain_files)
out_files.append(out_file)
node.add_output_list_opt('--output-files', out_files)
return node