Source code for pycbc.workflow.grb_utils

# Copyright (C) 2015  Andrew Williamson, Francesco Pannarale
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# =============================================================================
#                                   Preamble
# =============================================================================

This library code contains functions and classes that are used in the
generation of pygrb workflows. For details about pycbc.workflow see here:

import os
import logging
import numpy as np
from scipy.stats import rayleigh
from gwdatafind.utils import filename_metadata

from pycbc import makedir
from pycbc.workflow.core import \
    File, FileList, resolve_url_to_file,\
    Executable, Node
from pycbc.workflow.jobsetup import select_generic_executable
from pycbc.workflow.pegasus_workflow import SubWorkflow
from pycbc.workflow.plotting import PlotExecutable

logger = logging.getLogger('pycbc.workflow.grb_utils')

def _select_grb_pp_class(wflow, curr_exe):
    This function returns the class for PyGRB post-processing scripts.

    curr_exe : string
        The name of the executable

    exe_class : Sub-class of pycbc.workflow.core.Executable that holds utility
        functions appropriate for the given executable.  Instances of the class
        ('jobs') **must** have methods
        * job.create_node()
        * job.get_valid_times(ifo, )
    exe_path = wflow.cp.get('executables', curr_exe)
    exe_name = os.path.basename(exe_path)
    exe_to_class_map = {
        'pycbc_grb_trig_combiner': PycbcGrbTrigCombinerExecutable,
        'pycbc_grb_trig_cluster': PycbcGrbTrigClusterExecutable,
        'pycbc_grb_inj_finder': PycbcGrbInjFinderExecutable
    if exe_name not in exe_to_class_map:
        raise ValueError(f"No job class exists for executable {curr_exe}")

    return exe_to_class_map[exe_name]

[docs] def set_grb_start_end(cp, start, end): """ Function to update analysis boundaries as workflow is generated Parameters ---------- cp : pycbc.workflow.configuration.WorkflowConfigParser object The parsed configuration options of a pycbc.workflow.core.Workflow. start : int The start of the workflow analysis time. end : int The end of the workflow analysis time. Returns -------- cp : pycbc.workflow.configuration.WorkflowConfigParser object The modified WorkflowConfigParser object. """ cp.set("workflow", "start-time", str(start)) cp.set("workflow", "end-time", str(end)) return cp
[docs] def make_gating_node(workflow, datafind_files, outdir=None, tags=None): ''' Generate jobs for autogating the data for PyGRB runs. Parameters ---------- workflow: pycbc.workflow.core.Workflow An instanced class that manages the constructed workflow. datafind_files : pycbc.workflow.core.FileList A FileList containing the frame files to be gated. outdir : string Path of the output directory tags : list of strings If given these tags are used to uniquely name and identify output files that would be produced in multiple calls to this function. Returns -------- condition_strain_nodes : list List containing the pycbc.workflow.core.Node objects representing the autogating jobs. condition_strain_outs : pycbc.workflow.core.FileList FileList containing the pycbc.workflow.core.File objects representing the gated frame files. ''' cp = workflow.cp if tags is None: tags = [] condition_strain_class = select_generic_executable(workflow, "condition_strain") condition_strain_nodes = [] condition_strain_outs = FileList([]) for ifo in workflow.ifos: input_files = FileList([datafind_file for datafind_file in datafind_files if datafind_file.ifo == ifo]) condition_strain_jobs = condition_strain_class(cp, "condition_strain", ifos=ifo, out_dir=outdir, tags=tags) condition_strain_node, condition_strain_out = \ condition_strain_jobs.create_node(input_files, tags=tags) condition_strain_nodes.append(condition_strain_node) condition_strain_outs.extend(FileList([condition_strain_out])) return condition_strain_nodes, condition_strain_outs
[docs] def fermi_core_tail_model( sky_err, rad, core_frac=0.98, core_sigma=3.6, tail_sigma=29.6): """Fermi systematic error model following, with default values valid before 11 September 2019. Parameters ---------- core_frac : float Fraction of the systematic uncertainty contained within the core component. core_sigma : float Size of the GBM systematic core component. tail_sigma : float Size of the GBM systematic tail component. Returns _______ tuple Tuple containing the core and tail probability distributions as a function of radius. """ scaledsq = sky_err**2 / -2 / np.log(0.32) return ( frac * (1 - np.exp(-0.5 * (rad / np.sqrt(scaledsq + sigma**2))**2)) for frac, sigma in zip([core_frac, 1 - core_frac], [core_sigma, tail_sigma]))
[docs] def get_sky_grid_scale( sky_error=0.0, containment=0.9, upscale=False, fermi_sys=False, precision=1e-3, **kwargs): """ Calculate the angular radius corresponding to a desired localization uncertainty level. This is used to generate the search grid and involves scaling up the standard 1-sigma value provided to the workflow, assuming a normal probability profile. Fermi systematic errors can be included, following, with default values valid before 11 September 2019. The default probability coverage is 90%. Parameters ---------- sky_error : float The reported statistical 1-sigma sky error of the trigger. containment : float The desired localization probability to be covered by the sky grid. upscale : bool, optional Whether to apply rescale to convert from 1 sigma -> containment for non-Fermi triggers. Default = True as Swift reports 90% radius directly. fermi_sys : bool, optional Whether to apply Fermi-GBM systematics via ``fermi_core_tail_model``. Default = False. precision : float, optional Precision (in degrees) for calculating the error radius via Fermi-GBM model. **kwargs Additional keyword arguments passed to `fermi_core_tail_model`. Returns _______ float Sky error radius in degrees. """ if fermi_sys: lims = (0.5, 4) radii = np.linspace( lims[0] * sky_error, lims[1] * sky_error, int((lims[1] - lims[0]) * sky_error / precision) + 1) core, tail = fermi_core_tail_model(sky_error, radii, **kwargs) out = radii[(abs(core + tail - containment)).argmin()] else: # Use Rayleigh distribution to go from 1 sigma containment to # containment given by function variable. Interval method returns # bounds of equal probability about the median, but we want 1-sided # bound, hence use (2 * containment - 1) out = sky_error if upscale: out *= rayleigh.interval(2 * containment - 1)[-1] return out
[docs] def make_skygrid_node(workflow, out_dir, tags=None): """ Adds a job to the workflow to produce the PyGRB search skygrid.""" tags = [] if tags is None else tags # Initialize job node grb_name = workflow.cp.get('workflow', 'trigger-name') extra_tags = ['GRB'+grb_name] node = Executable(workflow.cp, 'make_sky_grid', ifos=workflow.ifos, out_dir=out_dir, tags=tags+extra_tags).create_node() node.add_opt('--instruments', ' '.join(workflow.ifos)) node.new_output_file_opt(workflow.analysis_time, '.h5', '--output', tags=extra_tags, store_file=True) # Add job node to the workflow workflow += node return node.output_files
[docs] def generate_tc_prior(wflow, tc_path, buffer_seg): """ Generate the configuration file for the prior on the coalescence time of injections, ensuring that these times fall in the analysis time and avoid the onsource and its buffer. Parameters ---------- tc_path : str Path where the configuration file for the prior needs to be written. buffer_seg : segmentlist Start and end times of the buffer segment encapsulating the onsource. """ # Write the tc-prior configuration file if it does not exist if os.path.exists(tc_path): raise ValueError("Refusing to overwrite %s." % tc_path) tc_file = open(tc_path, "w") tc_file.write("[prior-tc]\n") tc_file.write("name = uniform\n") tc_file.write("min-tc = %s\n" % wflow.analysis_time[0]) tc_file.write("max-tc = %s\n\n" % wflow.analysis_time[1]) tc_file.write("[constraint-tc]\n") tc_file.write("name = custom\n") tc_file.write("constraint_arg = (tc < %s) | (tc > %s)\n" % (buffer_seg[0], buffer_seg[1])) tc_file.close() # Add the tc-prior configuration file url to wflow.cp if necessary tc_file_path = "file://"+tc_path for inj_sec in wflow.cp.get_subsections("injections"): config_urls = wflow.cp.get("workflow-injections", inj_sec+"-config-files") config_urls = [url.strip() for url in config_urls.split(",")] if tc_file_path not in config_urls: config_urls += [tc_file_path] config_urls = ', '.join([str(item) for item in config_urls]) wflow.cp.set("workflow-injections", inj_sec+"-config-files", config_urls)
[docs] def setup_pygrb_pp_workflow(wf, pp_dir, seg_dir, segment, bank_file, insp_files, inj_files, inj_insp_files, inj_tags): """ Generate post-processing section of PyGRB offline workflow Parameters ---------- wf : The workflow object pp_dir : The directory where the post-processing files will be stored seg_dir : The directory where the segment files are stored segment : The segment to be analyzed bank_file : The full template bank file insp_files : The list of inspiral files inj_files : The list of injection files inj_insp_files : The list of inspiral files for injections inj_tags : The list of injection tags Returns ------- trig_files : FileList The list of combined trigger files [ALL_TIMES, ONSOURCE, OFFSOURCE, OFFTRIAL_1, ..., OFFTRIAL_N] FileList (N can be set by the user and is 6 by default) clustered_files : FileList CLUSTERED FileList, same order as trig_files Contains triggers after clustering inj_find_files : FileList FOUNDMISSED FileList covering all injection sets """ # Begin setting up trig combiner job(s) # Select executable class and initialize exe_class = _select_grb_pp_class(wf, "trig_combiner") job_instance = exe_class(wf.cp, "trig_combiner") # Create node for coherent no injections jobs node, trig_files = job_instance.create_node(wf.ifo_string, seg_dir, segment, insp_files, pp_dir, bank_file) wf.add_node(node) # Trig clustering for each trig file exe_class = _select_grb_pp_class(wf, "trig_cluster") job_instance = exe_class(wf.cp, "trig_cluster") clustered_files = FileList([]) for trig_file in trig_files: # Create and add nodes node, out_file = job_instance.create_node(trig_file, pp_dir) wf.add_node(node) clustered_files.append(out_file) # Find injections from triggers exe_class = _select_grb_pp_class(wf, "inj_finder") job_instance = exe_class(wf.cp, "inj_finder") inj_find_files = FileList([]) for inj_tag in inj_tags: tag_inj_files = FileList([f for f in inj_files if inj_tag in f.tags]) # The here stems from the injection group information # being stored in the second tag. This could be improved # depending on the final implementation of injections tag_insp_files = FileList([f for f in inj_insp_files if inj_tag in f.tags[1]]) node, inj_find_file = job_instance.create_node( tag_inj_files, tag_insp_files, bank_file, pp_dir) wf.add_node(node) inj_find_files.append(inj_find_file) return trig_files, clustered_files, inj_find_files
[docs] class PycbcGrbTrigCombinerExecutable(Executable): """ The class responsible for creating jobs for ''pycbc_grb_trig_combiner''. """ current_retention_level = Executable.ALL_TRIGGERS def __init__(self, cp, name): super().__init__(cp=cp, name=name) self.trigger_name = cp.get('workflow', 'trigger-name') self.trig_start_time = cp.get('workflow', 'start-time') self.num_trials = int(cp.get('trig_combiner', 'num-trials'))
[docs] def create_node(self, ifo_tag, seg_dir, segment, insp_files, out_dir, bank_file, tags=None): node = Node(self) node.add_opt('--verbose') node.add_opt("--ifo-tag", ifo_tag) node.add_opt("--grb-name", self.trigger_name) node.add_opt("--trig-start-time", self.trig_start_time) node.add_opt("--segment-dir", seg_dir) node.add_input_list_opt("--input-files", insp_files) node.add_opt("--user-tag", "PYGRB") node.add_input_opt("--bank-file", bank_file) # Prepare output file tag user_tag = f"PYGRB_GRB{self.trigger_name}" if tags: user_tag += "_{}".format(tags) # Add on/off source and off trial outputs output_files = FileList([]) outfile_types = ['ALL_TIMES', 'ONSOURCE', 'OFFSOURCE'] for i in range(self.num_trials): outfile_types.append("OFFTRIAL_{}".format(i+1)) for out_type in outfile_types: out_name = "{}-{}_{}-{}-{}.h5".format( ifo_tag, user_tag, out_type, segment[0], segment[1]-segment[0]) out_file = File(ifo_tag, 'trig_combiner', segment, file_url=os.path.join(out_dir, out_name)) node.add_output(out_file) output_files.append(out_file) return node, output_files
[docs] class PycbcGrbTrigClusterExecutable(Executable): """ The class responsible for creating jobs for ''pycbc_grb_trig_cluster''. """ current_retention_level = Executable.ALL_TRIGGERS def __init__(self, cp, name): super().__init__(cp=cp, name=name)
[docs] def create_node(self, in_file, out_dir): node = Node(self) node.add_input_opt("--trig-file", in_file) # Determine output file name ifotag, filetag, segment = filename_metadata( start, end = segment out_name = "{}-{}_CLUSTERED-{}-{}.h5".format(ifotag, filetag, start, end-start) out_file = File(ifotag, 'trig_cluster', segment, file_url=os.path.join(out_dir, out_name)) node.add_output(out_file) return node, out_file
[docs] class PycbcGrbInjFinderExecutable(Executable): """The class responsible for creating jobs for ``pycbc_grb_inj_finder`` """ current_retention_level = Executable.ALL_TRIGGERS def __init__(self, cp, exe_name): super().__init__(cp=cp, name=exe_name)
[docs] def create_node(self, inj_files, inj_insp_files, bank_file, out_dir, tags=None): if tags is None: tags = [] node = Node(self) node.add_input_list_opt('--input-files', inj_insp_files) node.add_input_list_opt('--inj-files', inj_files) node.add_input_opt('--bank-file', bank_file) ifo_tag, desc, segment = filename_metadata(inj_files[0].name) desc = '_'.join(desc.split('_')[:-1]) out_name = "{}-{}_FOUNDMISSED-{}-{}.h5".format( ifo_tag, desc, segment[0], abs(segment)) out_file = File(ifo_tag, 'inj_finder', segment, os.path.join(out_dir, out_name), tags=tags) node.add_output(out_file) return node, out_file
[docs] def build_segment_filelist(seg_dir): """Construct a FileList instance containing all segments txt files""" # Needs to be in this order for consistency with _read_seg_files file_names = ["bufferSeg.txt", "offSourceSeg.txt", "onSourceSeg.txt"] seg_files = [os.path.join(seg_dir, fn) for fn in file_names] seg_files = [resolve_url_to_file(sf) for sf in seg_files] seg_files = FileList(seg_files) return seg_files
[docs] def make_pygrb_plot(workflow, exec_name, out_dir, ifo=None, inj_file=None, trig_file=None, onsource_file=None, bank_file=None, seg_files=None, veto_file=None, tags=None, **kwargs): """Adds a node for a plot of PyGRB results to the workflow""" tags = [] if tags is None else tags # Initialize job node with its tags grb_name = workflow.cp.get('workflow', 'trigger-name') extra_tags = ['GRB'+grb_name] # TODO: why is inj_set repeated twice in output files? # if inj_set is not None: # extra_tags.append(inj_set) if ifo: extra_tags.append(ifo) node = PlotExecutable(workflow.cp, exec_name, ifos=workflow.ifos, out_dir=out_dir, tags=tags+extra_tags).create_node() if trig_file: node.add_input_opt('--trig-file', trig_file) # Pass the veto and segment files and options if seg_files: node.add_input_list_opt('--seg-files', seg_files) if veto_file: node.add_input_opt('--veto-file', veto_file) # Option to show the onsource trial if this is a plot of all data if exec_name == 'pygrb_plot_snr_timeseries' and 'alltimes' in tags: node.add_opt('--onsource') if exec_name in ['pygrb_plot_injs_results', 'pygrb_plot_snr_timeseries']: trig_time = workflow.cp.get('workflow', 'trigger-time') node.add_opt('--trigger-time', trig_time) # Pass the injection file as an input File instance if inj_file is not None and exec_name not in \ ['pygrb_plot_skygrid', 'pygrb_plot_stats_distribution']: node.add_input_opt('--found-missed-file', inj_file) # IFO option if ifo: node.add_opt('--ifo', ifo) # Output files and final input file (passed as a File instance) if exec_name == 'pygrb_efficiency': # In this case tags[0] is the offtrial number node.add_input_opt('--bank-file', bank_file) node.add_opt('--trial-name', tags[0]) node.add_opt('--injection-set-name', tags[1]) # Output the sensitivity plot if kwargs['plot_bkgd']: node.new_output_file_opt(workflow.analysis_time, '.png', '--background-output-file', tags=extra_tags+['max_background']) # Output the exclusion distance plot and table else: node.add_input_opt('--onsource-file', onsource_file) node.new_output_file_opt(workflow.analysis_time, '.png', '--onsource-output-file', tags=['onsource']+extra_tags) node.new_output_file_opt(workflow.analysis_time, '.json', '--exclusion-dist-output-file', tags=extra_tags) else: node.new_output_file_opt(workflow.analysis_time, '.png', '--output-file', tags=extra_tags) if exec_name in ['pygrb_plot_coh_ifosnr', 'pygrb_plot_null_stats'] \ and 'zoomin' in tags: node.add_opt('--zoom-in') # Quantity to be displayed on the y-axis of the plot if exec_name in ['pygrb_plot_chisq_veto', 'pygrb_plot_null_stats', 'pygrb_plot_snr_timeseries']: node.add_opt('--y-variable', tags[0]) # Quantity to be displayed on the x-axis of the plot elif exec_name == 'pygrb_plot_stats_distribution': node.add_opt('--x-variable', tags[0]) elif exec_name == 'pygrb_plot_injs_results': # Variables to plot on x and y axes node.add_opt('--y-variable', tags[0]) node.add_opt('--x-variable', tags[1]) # Flag to plot found over missed or missed over found if tags[2] == 'missed-on-top': node.add_opt('--'+tags[2]) # Enable log axes subsection = '_'.join(tags[0:2]) for log_flag in ['x-log', 'y-log']: if workflow.cp.has_option_tags(exec_name, log_flag, tags=[subsection]): node.add_opt('--'+log_flag) # Add job node to workflow workflow += node return node, node.output_files
[docs] def make_pygrb_info_table(workflow, exec_name, out_dir, in_files=None, tags=None): """ Setup a job to create an html snippet with the GRB trigger information or exlusion distances information. """ # Organize tags tags = [] if tags is None else tags grb_name = workflow.cp.get('workflow', 'trigger-name') extra_tags = ['GRB'+grb_name] # Initialize job node node = PlotExecutable(workflow.cp, exec_name, ifos=workflow.ifos, out_dir=out_dir, tags=tags+extra_tags).create_node() # Options if exec_name == 'pygrb_grb_info_table': node.add_opt('--ifos', ' '.join(workflow.ifos)) elif exec_name == 'pygrb_exclusion_dist_table': node.add_input_opt('--input-files', in_files) # Output node.new_output_file_opt(workflow.analysis_time, '.html', '--output-file', tags=extra_tags) # Add job node to workflow workflow += node return node, node.output_files
[docs] def make_pygrb_injs_tables(workflow, out_dir, bank_file, off_file, seg_files, inj_file=None, on_file=None, veto_file=None, tags=None): """ Adds a job to make quiet-found and missed-found injection tables, or loudest trigger(s) table.""" tags = [] if tags is None else tags # Executable exec_name = 'pygrb_page_tables' # Initialize job node grb_name = workflow.cp.get('workflow', 'trigger-name') extra_tags = ['GRB'+grb_name] node = PlotExecutable(workflow.cp, exec_name, ifos=workflow.ifos, out_dir=out_dir, tags=tags+extra_tags).create_node() # Pass the bank-file node.add_input_opt('--bank-file', bank_file) # Offsource input file (or equivalently trigger file for injections) offsource_file = off_file node.add_input_opt('--offsource-file', offsource_file) # Pass the veto and segment files (as File instances) if veto_file: node.add_input_opt('--veto-file', veto_file) node.add_input_list_opt('--seg-files', seg_files) # Handle input/output for injections if inj_file: # Found-missed injection file (passed as File instance) node.add_input_opt('--found-missed-file', inj_file) # Missed-found and quiet-found injections html output files for mf_or_qf in ['missed-found', 'quiet-found']: mf_or_qf_tags = [mf_or_qf.upper().replace('-', '_')] node.new_output_file_opt(workflow.analysis_time, '.html', '--'+mf_or_qf+'-injs-output-file', tags=extra_tags+mf_or_qf_tags) # Quiet-found injections h5 output file node.new_output_file_opt(workflow.analysis_time, '.h5', '--quiet-found-injs-h5-output-file', tags=extra_tags+['QUIET_FOUND']) # Handle input/output for onsource/offsource else: src_type = 'offsource-trigs' if on_file: src_type = 'onsource-trig' # Pass onsource input File instance node.add_input_opt('--onsource-file', on_file) # Loudest offsource/onsource triggers html and h5 output files src_type_tags = [src_type.upper().replace('-', '_')] node.new_output_file_opt(workflow.analysis_time, '.html', '--loudest-'+src_type+'-output-file', tags=extra_tags+src_type_tags) node.new_output_file_opt(workflow.analysis_time, '.h5', '--loudest-'+src_type+'-h5-output-file', tags=extra_tags+src_type_tags) # Add job node to the workflow workflow += node return node, node.output_files
# Based on setup_single_det_minifollowups
[docs] def setup_pygrb_minifollowups(workflow, followups_file, trigger_file, dax_output, out_dir, seg_files=None, veto_file=None, tags=None): """ Create plots that followup the the loudest PyGRB triggers or missed injections from an HDF file. Parameters ---------- workflow: pycbc.workflow.Workflow The core workflow instance we are populating followups_file: pycbc.workflow.File The File class holding the triggers/injections to follow up trigger_file: pycbc.workflow.File The File class holding the triggers dax_output: The directory that will contain the dax file out_dir: path The directory to store minifollowups result plots and files seg_files: {pycbc.workflow.FileList, optional} The list of segments Files veto_file: {pycbc.workflow.File, optional} The veto definer file tags: {None, optional} Tags to add to the minifollowups executables """'Entering minifollowups module') if not workflow.cp.has_section('workflow-minifollowups'): msg = 'There is no [workflow-minifollowups] section in ' msg += 'the configuration file''Leaving minifollowups') return tags = [] if tags is None else tags makedir(dax_output) # Turn the config file into a File instance config_path = os.path.abspath(dax_output + '/' + '_'.join(tags) + '_minifollowup.ini') workflow.cp.write(open(config_path, 'w')) config_file = resolve_url_to_file(config_path) # wikifile = curr_ifo + '_'.join(tags) + 'loudest_table.txt' wikifile = '_'.join(tags) + 'loudest_table.txt' # Create the node exe = Executable(workflow.cp, 'pygrb_minifollowups', ifos=workflow.ifos, out_dir=dax_output, tags=tags) node = exe.create_node() node.add_input_opt('--trig-file', trigger_file) # Grab and pass all necessary files as File instances if seg_files: node.add_input_list_opt('--seg-files', seg_files) if veto_file: node.add_input_opt('--veto-file', veto_file) node.add_input_opt('--config-files', config_file) node.add_input_opt('--followups-file', followups_file) node.add_opt('--wiki-file', wikifile) if tags: node.add_list_opt('--tags', tags) node.new_output_file_opt(workflow.analysis_time, '.dax', '--dax-file') node.new_output_file_opt(workflow.analysis_time, '', '--output-map') name = node.output_files[0].name assert name.endswith('.dax') map_file = node.output_files[1] assert'.map') node.add_opt('--workflow-name', name) node.add_opt('--output-dir', out_dir) node.add_opt('--dax-file-directory', '.') workflow += node # Execute this in a sub-workflow fil = node.output_files[0] job = SubWorkflow(, is_planned=False) job.set_subworkflow_properties(map_file, staging_site=workflow.staging_site, cache_file=workflow.cache_file) job.add_into_workflow(workflow)'Leaving minifollowups module')
[docs] def setup_pygrb_results_workflow(workflow, res_dir, trig_files, inj_files, bank_file, seg_dir, veto_file=None,tags=None, explicit_dependencies=None): """Create subworkflow to produce plots, tables, and results webpage for a PyGRB analysis. Parameters ---------- workflow: pycbc.workflow.Workflow The core workflow instance we are populating res_dir: The post-processing directory where results (plots, etc.) will be stored trig_files: FileList of trigger files inj_files: FileList of injection results bank_file: The template bank File object seg_dir: The directory path with the segments files veto_file: {None, optional} The veto File object tags: {None, optional} Tags to add to the executables explicit_dependencies: {None, optional} nodes that must precede this """ tags = [] if tags is None else tags dax_output = res_dir+'/webpage_daxes' # _workflow.makedir(dax_output) makedir(dax_output) # Create the node exe = Executable(workflow.cp, 'pygrb_results_workflow', ifos=workflow.ifo_string, out_dir=dax_output, tags=tags) node = exe.create_node() # Grab and pass all necessary files node.add_input_list_opt('--trig-files', trig_files) # node.add_input_opt('--config-files', config_file) node.add_input_list_opt('--inj-files', inj_files) node.add_input_opt('--bank-file', bank_file) node.add_opt('--segment-dir', seg_dir) if veto_file: node.add_input_opt('--veto-file', veto_file) if tags: node.add_list_opt('--tags', tags) node.new_output_file_opt(workflow.analysis_time, '.dax', '--dax-file', tags=tags) node.new_output_file_opt(workflow.analysis_time, '.map', '--output-map', tags=tags) # + ['MAP'], use_tmp_subdirs=True) name = node.output_files[0].name assert name.endswith('.dax') map_file = node.output_files[1] assert'.map') node.add_opt('--workflow-name', name) # This is the output dir for the products of this node, namely dax and map node.add_opt('--output-dir', res_dir) node.add_opt('--dax-file-directory', '.') # Turn the config file into a File instance config_path = os.path.abspath(dax_output + '/' + '_'.join(tags) + 'webpage.ini') workflow.cp.write(open(config_path, 'w')) config_file = resolve_url_to_file(config_path) node.add_input_opt('--config-files', config_file) # Track additional ini file produced by pycbc_pygrb_results_workflow out_file = File(workflow.ifos, 'pygrb_results_workflow', workflow.analysis_time, file_url=os.path.join(dax_output, name+'.ini')) node.add_output(out_file) # Add node to the workflow workflow += node if explicit_dependencies is not None: for dep in explicit_dependencies: workflow.add_explicit_dependancy(dep, node) # Execute this in a sub-workflow job = SubWorkflow(name, is_planned=False) # , _id='results') job.set_subworkflow_properties(map_file, staging_site=workflow.staging_site, cache_file=workflow.cache_file) job.add_into_workflow(workflow) return node.output_files