# Copyright (C) 2013, 2017 Ian Harry, Alex Nitz, Duncan Brown
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
This module provides the worker functions and classes that are used when
creating a workflow. For details about the workflow module see here:
https://ldas-jobs.ligo.caltech.edu/~cbc/docs/pycbc/ahope.html
"""
import os, stat, subprocess, logging, math, string, urllib, pickle, copy
import configparser as ConfigParser
from urllib.request import pathname2url
from urllib.parse import urljoin
import numpy, random
from itertools import combinations, groupby, permutations
from operator import attrgetter
import lal
import lal.utils
import Pegasus.api # Try and move this into pegasus_workflow
from glue import lal as gluelal
from ligo import segments
from ligo.lw import lsctables, ligolw
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import segments as ligolw_segments
from pycbc import makedir
from pycbc.io.ligolw import LIGOLWContentHandler, create_process_table
from . import pegasus_workflow
from .configuration import WorkflowConfigParser, resolve_url
from .pegasus_sites import make_catalog
[docs]def make_analysis_dir(path):
"""
Make the analysis directory path, any parent directories that don't already
exist, and the 'logs' subdirectory of path.
"""
if path is not None:
makedir(os.path.join(path, 'logs'))
file_input_from_config_dict = {}
[docs]class Executable(pegasus_workflow.Executable):
# These are the file retention levels
DO_NOT_KEEP = 0
INTERMEDIATE_PRODUCT = 1
ALL_TRIGGERS = 2
MERGED_TRIGGERS = 3
FINAL_RESULT = 4
# Set this parameter to indicate that this option is used to specify a
# file and is *not* handled explicitly in the create_node or __init__
# methods of the sub-class. Usually that is to say that this option is a
# file and is normally specified in an file, e.g. a PSD file. As files
# need to be identified as such to pegasus, this attempts to catch this
# case.
# These are standard file input arguments used in PyCBC, so we declare
# these as files if given to any PyCBC job.
file_input_options = ['--gating-file', '--frame-files', '--injection-file',
'--statistic-files', '--bank-file', '--config-files',
'--psd-file', '--asd-file',
'--fake-strain-from-file',
'--sgburst-injection-file']
# Set this parameter to indicate that this option should take different
# values based on the time. E.g. something like
# --option1 value1[0:1000],value2[1000:2000]
# would be replaced with --option1 value1 if the time is within 0,1000 and
# value2 if in 1000,2000. A failure will be replaced if the job time is
# not fully contained in one of these windows, or if fully contained in
# multiple of these windows. This is resolved when creating the Job from
# the Executable
time_dependent_options = []
# This is the default value. It will give a warning if a class is
# used where the retention level is not set. The file will still be stored
KEEP_BUT_RAISE_WARNING = 5
_warned_classes_list = ['Executable']
# Sub classes, or instances, should override this. If not overriden the
# file will be retained, but a warning given
current_retention_level = KEEP_BUT_RAISE_WARNING
def __init__(self, cp, name, ifos=None, out_dir=None, tags=None,
reuse_executable=True, set_submit_subdir=True):
"""
Initialize the Executable class.
Parameters
-----------
cp : ConfigParser object
The ConfigParser object holding the workflow configuration settings
exec_name : string
Executable name
universe : string, optional
Condor universe to run the job in
ifos : string or list, optional
The ifo(s) that the Job is valid for. If the job is
independently valid for multiple ifos it can be provided as a list.
Ie. ['H1',L1','V1'], if the job is only valid for the combination
of ifos (for e.g. ligolw_thinca) then this can be supplied
as, for e.g. "H1L1V1".
out_dir: path, optional
The folder to store output files of this job.
tags : list of strings
A list of strings that is used to identify this job.
"""
if isinstance(ifos, str):
self.ifo_list = [ifos]
else:
self.ifo_list = ifos
if self.ifo_list is not None:
self.ifo_list = sorted(self.ifo_list)
self.ifo_string = ''.join(self.ifo_list)
else:
self.ifo_string = None
self.cp = cp
self.name = name
self.container_cls = None
self.container_type = None
try:
self.installed = cp.getboolean('pegasus_profile-%s' % name,
'pycbc|installed')
except:
self.installed = False
self.update_current_tags(tags)
self.update_output_directory(out_dir=out_dir)
# Determine the level at which output files should be kept
if cp.has_option_tags('pegasus_profile-%s' % name,
'pycbc|retention_level', tags):
# Get the retention_level from the config file
# This method allows us to use the retention levels
# defined above
cfg_ret_level = cp.get_opt_tags('pegasus_profile-%s' % name,
'pycbc|retention_level', tags)
self.current_retention_level = getattr(self, cfg_ret_level)
self.update_current_retention_level(self.current_retention_level)
# Should I reuse this executable?
if reuse_executable:
self.pegasus_name = self.name
else:
self.pegasus_name = self.tagged_name
# Check that the executable actually exists locally or
# looks like a URL, in which case trust Pegasus to be
# able to fetch it.
exe_path = cp.get('executables', name)
self.needs_fetching = False
exe_url = urllib.parse.urlparse(exe_path)
# See if the user specified a list of sites for the executable
# Ordering is:
# 1) Check if a specific site for this Executable is set.
# 2) Check is primary_site is set globally.
# 3) Use condorpool_symlink as a fallback.
self.exe_pfns = {}
if cp.has_option_tags('pegasus_profile-%s' % name, 'pycbc|site', tags):
exe_site = cp.get_opt_tags('pegasus_profile-%s' % name,
'pycbc|site', tags)
elif cp.has_option('pegasus_profile', 'pycbc|primary_site'):
exe_site = cp.get('pegasus_profile', 'pycbc|primary_site')
else:
exe_site = 'condorpool_symlink'
exe_site = exe_site.strip()
if exe_url.scheme in ['', 'file']:
# NOTE: There could be a case where the exe is available at a
# remote site, but not on the submit host. Currently allowed
# for the OSG site, versioning will not work as planned if
# we can't see the executable (can we perhaps run versioning
# including singularity??)
# Check that executables at file urls
# on the local site exist
if os.path.isfile(exe_url.path) is False:
raise TypeError("Failed to find %s executable "
"at %s on site %s" % (name, exe_path,
exe_site))
elif exe_url.scheme == 'singularity':
# Will use an executable within a singularity container. Don't
# need to do anything here, as I cannot easily check it exists.
exe_path = exe_url.path
else:
# Could be http, https, etc. so it needs fetching if run now
self.needs_fetching = True
if self.needs_fetching and not self.installed:
err_msg = "Non-file path URLs cannot be used unless the "
err_msg += "executable is a bundled standalone executable. "
err_msg += "If this is the case, then add the "
err_msg += "pycbc.installed=True property."
raise ValueError(err_msg)
if self.installed:
# Is installed, so copy from local site, like other inputs
self.exe_pfns['local'] = exe_path
else:
# We must rely on the executables, and accompanying libraries,
# being directly accessible on the execution site.
# CVMFS is perfect for this! As is singularity.
self.exe_pfns[exe_site] = exe_path
logging.debug(
"Using %s executable at %s on site %s",
name,
exe_url.path,
exe_site
)
# FIXME: This hasn't yet been ported to pegasus5 and won't work.
# Pegasus describes two ways to work with containers, and I need
# to figure out which is most appropriate and use that.
# Determine if this executables should be run in a container
try:
self.container_type = cp.get('pegasus_profile-%s' % name,
'container|type')
except:
pass
if self.container_type is not None:
# FIXME: Move the actual container setup into pegasus_workflow
self.container_img = cp.get('pegasus_profile-%s' % name,
'container|image')
try:
self.container_site = cp.get('pegasus_profile-%s' % name,
'container|image_site')
except:
self.container_site = 'local'
try:
self.container_mount = cp.get('pegasus_profile-%s' % name,
'container|mount').split(',')
except:
self.container_mount = None
self.container_cls = Pegasus.api.Container("{}-container".format(
name),
self.container_type,
self.container_img,
imagesite=self.container_site,
mount=self.container_mount)
super(Executable, self).__init__(self.pegasus_name,
installed=self.installed,
container=self.container_cls)
else:
super(Executable, self).__init__(self.pegasus_name,
installed=self.installed)
if hasattr(self, "group_jobs"):
self.add_profile('pegasus', 'clusters.size', self.group_jobs)
# This sets up the sub-directory to use in the submit directory
if set_submit_subdir:
self.add_profile('pegasus', 'relative.submit.dir',
self.pegasus_name)
# Set configurations from the config file, these should override all
# other settings
self._set_pegasus_profile_options()
self.execution_site = exe_site
self.executable_url = exe_path
@property
def ifo(self):
"""Return the ifo.
If only one ifo in the ifo list this will be that ifo. Otherwise an
error is raised.
"""
if self.ifo_list and len(self.ifo_list) == 1:
return self.ifo_list[0]
else:
errMsg = "self.ifoList must contain only one ifo to access the "
errMsg += "ifo property. %s." %(str(self.ifo_list),)
raise TypeError(errMsg)
[docs] def add_ini_profile(self, cp, sec):
"""Add profile from configuration file.
Parameters
-----------
cp : ConfigParser object
The ConfigParser object holding the workflow configuration settings
sec : string
The section containing options for this job.
"""
for opt in cp.options(sec):
namespace = opt.split('|')[0]
if namespace == 'pycbc' or namespace == 'container':
continue
value = cp.get(sec, opt).strip()
key = opt.split('|')[1]
self.add_profile(namespace, key, value)
def _add_ini_opts(self, cp, sec, ignore_existing=False):
"""Add job-specific options from configuration file.
Parameters
-----------
cp : ConfigParser object
The ConfigParser object holding the workflow configuration
settings
sec : string
The section containing options for this job.
"""
for opt in cp.options(sec):
if opt in self.all_added_options:
if ignore_existing:
continue
else:
raise ValueError("Option %s has already been added" % opt)
self.all_added_options.add(opt)
value = cp.get(sec, opt).strip()
opt = f'--{opt}'
if opt in self.file_input_options:
# This now expects the option to be a file
# Check if we have a list of files
values = [path for path in value.split(' ') if path]
self.common_raw_options.append(opt)
self.common_raw_options.append(' ')
# Get LFN and PFN
for path in values:
# Here I decide if the path is URL or
# IFO:/path/to/file or IFO:url://path/to/file
# That's somewhat tricksy as we used : as delimiter
split_path = path.split(':', 1)
if len(split_path) == 1:
ifo = None
path = path
else:
# Have I split a URL or not?
if split_path[1].startswith('//'):
# URL
ifo = None
path = path
else:
#IFO:path or IFO:URL
ifo = split_path[0]
path = split_path[1]
# If the file exists make sure to use the
# fill path as a file:// URL
if os.path.isfile(path):
curr_pfn = urljoin('file:',
pathname2url(os.path.abspath(path)))
else:
curr_pfn = path
curr_file = resolve_url_to_file(curr_pfn)
self.common_input_files.append(curr_file)
if ifo:
self.common_raw_options.append(ifo + ':')
self.common_raw_options.append(curr_file.dax_repr)
else:
self.common_raw_options.append(curr_file.dax_repr)
self.common_raw_options.append(' ')
elif opt in self.time_dependent_options:
# There is a possibility of time-dependent, file options.
# For now we will avoid supporting that complication unless
# it is needed. This would require resolving the file first
# in this function, and then dealing with the time-dependent
# stuff later.
self.unresolved_td_options[opt] = value
else:
# This option comes from the config file(s)
self.common_options += [opt, value]
[docs] def add_opt(self, opt, value=None):
"""Add option to job.
Parameters
-----------
opt : string
Name of option (e.g. --output-file-format)
value : string, (default=None)
The value for the option (no value if set to None).
"""
if value is None:
self.common_options += [opt]
else:
self.common_options += [opt, value]
[docs] def get_opt(self, opt):
"""Get value of option from configuration file
Parameters
-----------
opt : string
Name of option (e.g. output-file-format)
Returns
--------
value : string
The value for the option. Returns None if option not present.
"""
for sec in self.sections:
try:
key = self.cp.get(sec, opt)
if key:
return key
except ConfigParser.NoOptionError:
pass
return None
[docs] def has_opt(self, opt):
"""Check if option is present in configuration file
Parameters
-----------
opt : string
Name of option (e.g. output-file-format)
"""
for sec in self.sections:
val = self.cp.has_option(sec, opt)
if val:
return val
return False
[docs] def create_node(self, **kwargs):
"""Default node constructor.
This is usually overridden by subclasses of Executable.
"""
return Node(self, **kwargs)
[docs] def update_current_retention_level(self, value):
"""Set a new value for the current retention level.
This updates the value of self.retain_files for an updated value of the
retention level.
Parameters
-----------
value : int
The new value to use for the retention level.
"""
# Determine the level at which output files should be kept
self.current_retention_level = value
try:
global_retention_level = \
self.cp.get_opt_tags("workflow", "file-retention-level",
self.tags+[self.name])
except ConfigParser.Error:
msg="Cannot find file-retention-level in [workflow] section "
msg+="of the configuration file. Setting a default value of "
msg+="retain all files."
logging.warn(msg)
self.retain_files = True
self.global_retention_threshold = 1
self.cp.set("workflow", "file-retention-level", "all_files")
else:
# FIXME: Are these names suitably descriptive?
retention_choices = {
'all_files' : 1,
'all_triggers' : 2,
'merged_triggers' : 3,
'results' : 4
}
try:
self.global_retention_threshold = \
retention_choices[global_retention_level]
except KeyError:
err_msg = "Cannot recognize the file-retention-level in the "
err_msg += "[workflow] section of the ini file. "
err_msg += "Got : {0}.".format(global_retention_level)
err_msg += "Valid options are: 'all_files', 'all_triggers',"
err_msg += "'merged_triggers' or 'results' "
raise ValueError(err_msg)
if self.current_retention_level == 5:
self.retain_files = True
if type(self).__name__ in Executable._warned_classes_list:
pass
else:
warn_msg = "Attribute current_retention_level has not "
warn_msg += "been set in class {0}. ".format(type(self))
warn_msg += "This value should be set explicitly. "
warn_msg += "All output from this class will be stored."
logging.warn(warn_msg)
Executable._warned_classes_list.append(type(self).__name__)
elif self.global_retention_threshold > self.current_retention_level:
self.retain_files = False
else:
self.retain_files = True
[docs] def update_output_directory(self, out_dir=None):
"""Update the default output directory for output files.
Parameters
-----------
out_dir : string (optional, default=None)
If provided use this as the output directory. Else choose this
automatically from the tags.
"""
# Determine the output directory
if out_dir is not None:
self.out_dir = out_dir
elif len(self.tags) == 0:
self.out_dir = self.name
else:
self.out_dir = self.tagged_name
if not os.path.isabs(self.out_dir):
self.out_dir = os.path.join(os.getcwd(), self.out_dir)
def _set_pegasus_profile_options(self):
"""Set the pegasus-profile settings for this Executable.
These are a property of the Executable and not of nodes that it will
spawn. Therefore it *cannot* be updated without also changing values
for nodes that might already have been created. Therefore this is
only called once in __init__. Second calls to this will fail.
"""
# Executable- and tag-specific profile information
for sec in self.sections:
if self.cp.has_section('pegasus_profile-{0}'.format(sec)):
self.add_ini_profile(self.cp,
'pegasus_profile-{0}'.format(sec))
[docs]class Workflow(pegasus_workflow.Workflow):
"""
This class manages a pycbc workflow. It provides convenience
functions for finding input files using time and keywords. It can also
generate cache files from the inputs.
"""
def __init__(self, args, name=None):
"""
Create a pycbc workflow
Parameters
----------
args : argparse.ArgumentParser
The command line options to initialize a CBC workflow.
"""
# Parse ini file
self.cp = WorkflowConfigParser.from_cli(args)
self.args = args
if hasattr(args, 'dax_file'):
dax_file = args.dax_file or None
else:
dax_file = None
if hasattr(args, 'dax_file_directory'):
output_dir = args.dax_file_directory or args.output_dir or None
else:
output_dir = args.output_dir or None
super(Workflow, self).__init__(
name=name if name is not None else args.workflow_name,
directory=output_dir,
cache_file=args.cache_file,
dax_file_name=dax_file,
)
# Set global values
start_time = end_time = 0
if self.cp.has_option('workflow', 'start-time'):
start_time = int(self.cp.get("workflow", "start-time"))
if self.cp.has_option('workflow', 'end-time'):
end_time = int(self.cp.get("workflow", "end-time"))
self.analysis_time = segments.segment([start_time, end_time])
# Set the ifos to analyse
ifos = []
if self.cp.has_section('workflow-ifos'):
for ifo in self.cp.options('workflow-ifos'):
ifos.append(ifo.upper())
self.ifos = ifos
self.ifos.sort(key=str.lower)
self.get_ifo_combinations()
self.ifo_string = ''.join(self.ifos)
# Set up input and output file lists for workflow
self._inputs = FileList([])
self._outputs = FileList([])
# FIXME: Should this be in pegasus_workflow?
@property
def output_map(self):
args = self.args
if hasattr(args, 'output_map') and args.output_map is not None:
return args.output_map
if self.in_workflow is not False:
name = self.name + '.map'
else:
name = 'output.map'
path = os.path.join(self.out_dir, name)
return path
@property
def sites(self):
"""List of all possible exucution sites for jobs in this workflow"""
sites = set()
sites.add('local')
if self.cp.has_option('pegasus_profile', 'pycbc|primary_site'):
site = self.cp.get('pegasus_profile', 'pycbc|primary_site')
else:
# The default if not chosen
site = 'condorpool_symlink'
sites.add(site)
subsections = [sec for sec in self.cp.sections()
if sec.startswith('pegasus_profile-')]
for subsec in subsections:
if self.cp.has_option(subsec, 'pycbc|site'):
site = self.cp.get(subsec, 'pycbc|site')
sites.add(site)
return list(sites)
@property
def staging_site(self):
"""Site to use for staging to/from each site"""
staging_site = {}
for site in self.sites:
if site in ['condorpool_shared']:
staging_site[site] = site
else:
staging_site[site] = 'local'
return staging_site
@property
def staging_site_str(self):
return ','.join(['='.join(x) for x in self.staging_site.items()])
@property
def exec_sites_str(self):
return ','.join(self.sites)
[docs] def execute_node(self, node, verbatim_exe = False):
""" Execute this node immediately on the local machine
"""
node.executed = True
# Check that the PFN is for a file or path
if node.executable.needs_fetching:
try:
# The pfn may have been marked local...
pfn = node.executable.get_pfn()
except:
# or it may have been marked nonlocal. That's
# fine, we'll resolve the URL and make a local
# entry.
pfn = node.executable.get_pfn('nonlocal')
resolved = resolve_url(
pfn,
permissions=stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
)
node.executable.clear_pfns()
node.executable.add_pfn(urljoin('file:', pathname2url(resolved)),
site='local')
cmd_list = node.get_command_line()
# Must execute in output directory.
curr_dir = os.getcwd()
out_dir = node.executable.out_dir
os.chdir(out_dir)
# Make call
make_external_call(cmd_list, out_dir=os.path.join(out_dir, 'logs'),
out_basename=node.executable.name)
# Change back
os.chdir(curr_dir)
for fil in node._outputs:
fil.node = None
fil.add_pfn(urljoin('file:', pathname2url(fil.storage_path)),
site='local')
[docs] def save(self, filename=None, output_map_path=None, root=True):
# FIXME: Too close to pegasus to live here and not in pegasus_workflow
if output_map_path is None:
output_map_path = self.output_map
output_map_file = pegasus_workflow.File(os.path.basename(output_map_path))
output_map_file.add_pfn(output_map_path, site='local')
self.output_map_file = output_map_file
if self.in_workflow:
self._as_job.set_subworkflow_properties(
output_map_file,
staging_site=self.staging_site,
cache_file=self.cache_file
)
self._as_job.add_planner_args(**self._as_job.pycbc_planner_args)
# add transformations to dax
for transform in self._transformations:
self.add_transformation(transform)
for container in self._containers:
self.add_container(container)
# save the configuration file
ini_file = os.path.join(self.out_dir, self.name + '.ini')
# This shouldn't already exist, but just in case
if os.path.isfile(ini_file):
err_msg = "Refusing to overwrite configuration file that "
err_msg += "shouldn't be there: "
err_msg += ini_file
raise ValueError(err_msg)
with open(ini_file, 'w') as fp:
self.cp.write(fp)
# save the sites file
#FIXME change to check also for submit_now if we drop pycbc_submit_dax
# this would prevent sub-workflows from making extra unused sites.yml
if not self.in_workflow:
catalog_path = os.path.join(self.out_dir, 'sites.yml')
make_catalog(self.cp, self.out_dir).write(catalog_path)
# save the dax file
super(Workflow, self).save(filename=filename,
output_map_path=output_map_path,
submit_now=self.args.submit_now,
plan_now=self.args.plan_now,
root=root)
[docs] def save_config(self, fname, output_dir, cp=None):
""" Writes configuration file to disk and returns a pycbc.workflow.File
instance for the configuration file.
Parameters
-----------
fname : string
The filename of the configuration file written to disk.
output_dir : string
The directory where the file is written to disk.
cp : ConfigParser object
The ConfigParser object to write. If None then uses self.cp.
Returns
-------
FileList
The FileList object with the configuration file.
"""
cp = self.cp if cp is None else cp
ini_file_path = os.path.abspath(os.path.join(output_dir, fname))
with open(ini_file_path, "w") as fp:
cp.write(fp)
ini_file = File(self.ifos, "", self.analysis_time,
file_url="file://" + ini_file_path)
# set the physical file name
ini_file.add_pfn(ini_file_path, "local")
# set the storage path to be the same
ini_file.storage_path = ini_file_path
return FileList([ini_file])
[docs] def get_ifo_combinations(self):
"""
Get a list of strings for all possible combinations of IFOs
in the workflow
"""
self.ifo_combinations = []
for n in range(len(self.ifos)):
self.ifo_combinations += [''.join(ifos).lower() for ifos in
combinations(self.ifos, n + 1)]
[docs]class Node(pegasus_workflow.Node):
def __init__(self, executable, valid_seg=None):
super(Node, self).__init__(executable.get_transformation())
self.executable = executable
self.executed = False
self.set_category(executable.name)
self.valid_seg = valid_seg
self._options += self.executable.common_options
self._raw_options += self.executable.common_raw_options
for inp in self.executable.common_input_files:
self.add_input(inp)
if len(self.executable.time_dependent_options):
# Resolving these options requires the concept of a valid time.
# To keep backwards compatibility we will allow this to work if
# valid_seg is not supplied and no option actually needs resolving.
# It would be good to get this from the workflow's valid_seg if
# not overriden. But the Node is not connected to the Workflow
# until the dax starts to be written.
self.resolve_td_options(self.executable.unresolved_td_options)
[docs] def get_command_line(self):
# FIXME: Put in pegasus_workflow??
self._finalize()
arglist = self._dax_node.arguments
tmpargs = []
for a in arglist:
if not isinstance(a, File):
tmpargs += a.split(' ')
else:
tmpargs.append(a)
arglist = tmpargs
arglist = [a for a in arglist if a != '']
arglist = [a.storage_path if isinstance(a, File) else a for a in arglist]
# This allows the pfn to be an http(s) URL, which will be
# downloaded by resolve_url
exe_path = urllib.parse.urlsplit(self.executable.get_pfn()).path
return [exe_path] + arglist
[docs] def new_output_file_opt(self, valid_seg, extension, option_name, tags=None,
store_file=None, use_tmp_subdirs=False):
"""
This function will create a workflow.File object corresponding to the given
information and then add that file as output of this node.
Parameters
-----------
valid_seg : ligo.segments.segment
The time span over which the job is valid for.
extension : string
The extension to be used at the end of the filename.
E.g. '.xml' or '.sqlite'.
option_name : string
The option that is used when setting this job as output. For e.g.
'output-name' or 'output-file', whatever is appropriate for the
current executable.
tags : list of strings, (optional, default=[])
These tags will be added to the list of tags already associated with
the job. They can be used to uniquely identify this output file.
store_file : Boolean, (optional, default=True)
This file is to be added to the output mapper and will be stored
in the specified output location if True. If false file will be
removed when no longer needed in the workflow.
"""
if tags is None:
tags = []
# Changing this from set(tags) to enforce order. It might make sense
# for all jobs to have file names with tags in the same order.
all_tags = copy.deepcopy(self.executable.tags)
for tag in tags:
if tag not in all_tags:
all_tags.append(tag)
store_file = store_file if store_file is not None else self.executable.retain_files
fil = File(self.executable.ifo_list, self.executable.name,
valid_seg, extension=extension, store_file=store_file,
directory=self.executable.out_dir, tags=all_tags,
use_tmp_subdirs=use_tmp_subdirs)
self.add_output_opt(option_name, fil)
return fil
[docs] def add_multiifo_output_list_opt(self, opt, outputs):
""" Add an option that determines a list of outputs from multiple
detectors. Files will be supplied as --opt ifo1:input1 ifo2:input2
.....
"""
# NOTE: Here we have to use the raw arguments functionality as the
# file and ifo are not space separated.
self.add_raw_arg(opt)
self.add_raw_arg(' ')
for outfile in outputs:
self.add_raw_arg(outfile.ifo)
self.add_raw_arg(':')
self.add_raw_arg(outfile.name)
self.add_raw_arg(' ')
self.add_output(outfile)
[docs] def new_multiifo_output_list_opt(self, opt, ifos, analysis_time, extension,
tags=None, store_file=None,
use_tmp_subdirs=False):
""" Add an option that determines a list of outputs from multiple
detectors. Files will be supplied as --opt ifo1:input1 ifo2:input2
.....
File names are created internally from the provided extension and
analysis time.
"""
if tags is None:
tags = []
all_tags = copy.deepcopy(self.executable.tags)
for tag in tags:
if tag not in all_tags:
all_tags.append(tag)
output_files = FileList([])
store_file = store_file if store_file is not None \
else self.executable.retain_files
for ifo in ifos:
curr_file = File(ifo, self.executable.name, analysis_time,
extension=extension, store_file=store_file,
directory=self.executable.out_dir, tags=all_tags,
use_tmp_subdirs=use_tmp_subdirs)
output_files.append(curr_file)
self.add_multiifo_output_list_opt(opt, output_files)
[docs] def resolve_td_options(self, td_options):
for opt in td_options:
new_opt = resolve_td_option(td_options[opt], self.valid_seg)
self._options += [opt, new_opt]
@property
def output_files(self):
return FileList(self._outputs)
@property
def output_file(self):
"""
If only one output file return it. Otherwise raise an exception.
"""
out_files = self.output_files
if len(out_files) != 1:
err_msg = "output_file property is only valid if there is a single"
err_msg += " output file. Here there are "
err_msg += "%d output files." %(len(out_files))
raise ValueError(err_msg)
return out_files[0]
[docs]class File(pegasus_workflow.File):
'''
This class holds the details of an individual output file
This file(s) may be pre-supplied, generated from within the workflow
command line script, or generated within the workflow. The important stuff
is:
* The ifo that the File is valid for
* The time span that the OutFile is valid for
* A short description of what the file is
* The extension that the file should have
* The url where the file should be located
An example of initiating this class:
>> c = File("H1", "INSPIRAL_S6LOWMASS", segments.segment(815901601, 815902001), file_url="file://localhost/home/spxiwh/H1-INSPIRAL_S6LOWMASS-815901601-400.xml.gz" )
another where the file url is generated from the inputs:
>> c = File("H1", "INSPIRAL_S6LOWMASS", segments.segment(815901601, 815902001), directory="/home/spxiwh", extension="xml.gz" )
'''
def __init__(self, ifos, exe_name, segs, file_url=None,
extension=None, directory=None, tags=None,
store_file=True, use_tmp_subdirs=False):
"""
Create a File instance
Parameters
----------
ifos : string or list
The ifo(s) that the File is valid for. If the file is
independently valid for multiple ifos it can be provided as a list.
Ie. ['H1',L1','V1'], if the file is only valid for the combination
of ifos (for e.g. ligolw_thinca output) then this can be supplied
as, for e.g. "H1L1V1".
exe_name: string
A short description of the executable description, tagging
only the program that ran this job.
segs : ligo.segments.segment or ligo.segments.segmentlist
The time span that the OutFile is valid for. Note that this is
*not* the same as the data that the job that made the file reads in.
Lalapps_inspiral jobs do not analyse the first an last 72s of the
data that is read, and are therefore not valid at those times. If
the time is not continuous a segmentlist can be supplied.
file_url : url (optional, default=None)
If this is *not* supplied, extension and directory must be given.
If specified this explicitly points to the url of the file, or the
url where the file will be generated when made in the workflow.
extension : string (optional, default=None)
Either supply this *and* directory *or* supply only file_url.
If given this gives the extension at the end of the file name. The
full file name will be inferred from the other arguments
following the workflow standard.
directory : string (optional, default=None)
Either supply this *and* extension *or* supply only file_url.
If given this gives the directory in which the file exists, or will
exists. The file name will be inferred from the other arguments
following the workflow standard.
tags : list of strings (optional, default=None)
This is a list of descriptors describing what this file is. For
e.g. this might be ["BNSINJECTIONS" ,"LOWMASS","CAT_2_VETO"].
These are used in file naming.
"""
self.metadata = {}
# Set the science metadata on the file
if isinstance(ifos, str):
self.ifo_list = [ifos]
else:
self.ifo_list = ifos
self.ifo_string = ''.join(self.ifo_list)
self.description = exe_name
if isinstance(segs, segments.segment):
self.segment_list = segments.segmentlist([segs])
elif isinstance(segs, (segments.segmentlist)):
self.segment_list = segs
else:
err = "segs input must be either ligo.segments.segment or "
err += "segments.segmentlist. Got %s." %(str(type(segs)),)
raise ValueError(err)
if tags is None:
tags = []
if '' in tags:
logging.warn('DO NOT GIVE EMPTY TAGS (from %s)', exe_name)
tags.remove('')
self.tags = tags
if len(self.tags):
self.tag_str = '_'.join(tags)
tagged_description = '_'.join([self.description] + tags)
else:
tagged_description = self.description
# Follow the capitals-for-naming convention
self.ifo_string = self.ifo_string.upper()
self.tagged_description = tagged_description.upper()
if not file_url:
if not extension:
raise TypeError("a file extension required if a file_url "
"is not provided")
if not directory:
raise TypeError("a directory is required if a file_url is "
"not provided")
filename = self._filename(self.ifo_string, self.tagged_description,
extension, self.segment_list.extent())
path = os.path.join(directory, filename)
if not os.path.isabs(path):
path = os.path.join(os.getcwd(), path)
file_url = urllib.parse.urlunparse(['file', 'localhost', path,
None, None, None])
if use_tmp_subdirs and len(self.segment_list):
pegasus_lfn = str(int(self.segment_list.extent()[0]))[:-4]
pegasus_lfn = pegasus_lfn + '/' + os.path.basename(file_url)
else:
pegasus_lfn = os.path.basename(file_url)
super(File, self).__init__(pegasus_lfn)
if store_file:
self.storage_path = urllib.parse.urlsplit(file_url).path
else:
self.storage_path = None
def __getstate__(self):
""" Allow the workflow.File to be picklable. This disables the usage of
the internal cache entry.
"""
for i, seg in enumerate(self.segment_list):
self.segment_list[i] = segments.segment(float(seg[0]), float(seg[1]))
self.cache_entry = None
safe_dict = copy.copy(self.__dict__)
safe_dict['cache_entry'] = None
return safe_dict
# FIXME: This is a pegasus_workflow thing (don't think it's needed at all!)
# use the pegasus function directly (maybe not).
@property
def ifo(self):
"""
If only one ifo in the ifo_list this will be that ifo. Otherwise an
error is raised.
"""
if len(self.ifo_list) == 1:
return self.ifo_list[0]
else:
err = "self.ifo_list must contain only one ifo to access the "
err += "ifo property. %s." %(str(self.ifo_list),)
raise TypeError(err)
@property
def segment(self):
"""
If only one segment in the segmentlist this will be that segment.
Otherwise an error is raised.
"""
if len(self.segment_list) == 1:
return self.segment_list[0]
else:
err = "self.segment_list must only contain one segment to access"
err += " the segment property. %s." %(str(self.segment_list),)
raise TypeError(err)
@property
def cache_entry(self):
"""
Returns a CacheEntry instance for File.
"""
if self.storage_path is None:
raise ValueError('This file is temporary and so a lal '
'cache entry cannot be made')
file_url = urllib.parse.urlunparse(['file', 'localhost',
self.storage_path, None,
None, None])
cache_entry = lal.utils.CacheEntry(self.ifo_string,
self.tagged_description, self.segment_list.extent(), file_url)
cache_entry.workflow_file = self
return cache_entry
def _filename(self, ifo, description, extension, segment):
"""
Construct the standard output filename. Should only be used internally
of the File class.
"""
if extension.startswith('.'):
extension = extension[1:]
# Follow the frame convention of using integer filenames,
# but stretching to cover partially covered seconds.
start = int(segment[0])
end = int(math.ceil(segment[1]))
duration = str(end-start)
start = str(start)
return "%s-%s-%s-%s.%s" % (ifo, description.upper(), start,
duration, extension)
[docs] @classmethod
def from_path(cls, path, attrs=None, **kwargs):
"""
Create an output File object from path, with optional attributes.
"""
if attrs is None:
attrs = {}
if attrs and 'ifos' in attrs:
ifos = attrs['ifos']
else:
ifos = ['H1', 'K1', 'L1', 'V1']
if attrs and 'exe_name' in attrs:
exe_name = attrs['exe_name']
else:
exe_name = 'INPUT'
if attrs and 'segs' in attrs:
segs = attrs['segs']
else:
segs = segments.segment([1, 2000000000])
if attrs and 'tags' in attrs:
tags = attrs['tags']
else:
tags = []
curr_file = cls(ifos, exe_name, segs, path, tags=tags, **kwargs)
return curr_file
[docs]class FileList(list):
'''
This class holds a list of File objects. It inherits from the
built-in list class, but also allows a number of features. ONLY
pycbc.workflow.File instances should be within a FileList instance.
'''
entry_class = File
[docs] def categorize_by_attr(self, attribute):
'''
Function to categorize a FileList by a File object
attribute (eg. 'segment', 'ifo', 'description').
Parameters
-----------
attribute : string
File object attribute to categorize FileList
Returns
--------
keys : list
A list of values for an attribute
groups : list
A list of FileLists
'''
# need to sort FileList otherwise using groupby without sorting does
# 'AAABBBCCDDAABB' -> ['AAA','BBB','CC','DD','AA','BB']
# and using groupby with sorting does
# 'AAABBBCCDDAABB' -> ['AAAAA','BBBBB','CC','DD']
flist = sorted(self, key=attrgetter(attribute), reverse=True)
# use groupby to create lists
groups = []
keys = []
for k, g in groupby(flist, attrgetter(attribute)):
groups.append(FileList(g))
keys.append(k)
return keys, groups
[docs] def find_output(self, ifo, time):
'''Returns one File most appropriate at the given time/time range.
Return one File that covers the given time, or is most
appropriate for the supplied time range.
Parameters
-----------
ifo : string
Name of the ifo (or ifos) that the file should be valid for.
time : int/float/LIGOGPStime or tuple containing two values
If int/float/LIGOGPStime (or similar may of specifying one time) is
given, return the File corresponding to the time. This calls
self.find_output_at_time(ifo,time).
If a tuple of two values is given, return the File that is
**most appropriate** for the time range given. This calls
self.find_output_in_range
Returns
--------
pycbc_file : pycbc.workflow.File instance
The File that corresponds to the time or time range
'''
# Determine whether I have a specific time, or a range of times
try:
lenTime = len(time)
except TypeError:
# This is if I have a single time
outFile = self.find_output_at_time(ifo,time)
else:
# This is if I have a range of times
if lenTime == 2:
outFile = self.find_output_in_range(ifo,time[0],time[1])
# This is if I got a list that had more (or less) than 2 entries
if len(time) != 2:
raise TypeError("I do not understand the input variable time")
return outFile
[docs] def find_output_at_time(self, ifo, time):
'''
Return File that covers the given time.
Parameters
-----------
ifo : string
Name of the ifo (or ifos) that the File should correspond to
time : int/float/LIGOGPStime
Return the Files that covers the supplied time. If no
File covers the time this will return None.
Returns
--------
list of File classes
The Files that corresponds to the time.
'''
# Get list of Files that overlap time, for given ifo
outFiles = [i for i in self if ifo in i.ifo_list and time in i.segment_list]
if len(outFiles) == 0:
# No OutFile at this time
return None
elif len(outFiles) == 1:
# 1 OutFile at this time (good!)
return outFiles
else:
# Multiple output files. Currently this is valid, but we may want
# to demand exclusivity later, or in certain cases. Hence the
# separation.
return outFiles
[docs] def find_outputs_in_range(self, ifo, current_segment, useSplitLists=False):
"""
Return the list of Files that is most appropriate for the supplied
time range. That is, the Files whose coverage time has the
largest overlap with the supplied time range.
Parameters
-----------
ifo : string
Name of the ifo (or ifos) that the File should correspond to
current_segment : ligo.segments.segment
The segment of time that files must intersect.
Returns
--------
FileList class
The list of Files that are most appropriate for the time range
"""
currsegment_list = segments.segmentlist([current_segment])
# Get all files overlapping the window
overlap_files = self.find_all_output_in_range(ifo, current_segment,
useSplitLists=useSplitLists)
# By how much do they overlap?
overlap_windows = [abs(i.segment_list & currsegment_list) for i in overlap_files]
if not overlap_windows:
return []
# Return the File with the biggest overlap
# Note if two File have identical overlap, the first is used
# to define the valid segment
overlap_windows = numpy.array(overlap_windows, dtype = int)
segmentLst = overlap_files[overlap_windows.argmax()].segment_list
# Get all output files with the exact same segment definition
output_files = [f for f in overlap_files if f.segment_list==segmentLst]
return output_files
[docs] def find_output_in_range(self, ifo, start, end):
'''
Return the File that is most appropriate for the supplied
time range. That is, the File whose coverage time has the
largest overlap with the supplied time range. If no Files
overlap the supplied time window, will return None.
Parameters
-----------
ifo : string
Name of the ifo (or ifos) that the File should correspond to
start : int/float/LIGOGPStime
The start of the time range of interest.
end : int/float/LIGOGPStime
The end of the time range of interest
Returns
--------
File class
The File that is most appropriate for the time range
'''
currsegment_list = segments.segmentlist([segments.segment(start, end)])
# First filter Files corresponding to ifo
outFiles = [i for i in self if ifo in i.ifo_list]
if len(outFiles) == 0:
# No OutFiles correspond to that ifo
return None
# Filter OutFiles to those overlapping the given window
currSeg = segments.segment([start,end])
outFiles = [i for i in outFiles \
if i.segment_list.intersects_segment(currSeg)]
if len(outFiles) == 0:
# No OutFile overlap that time period
return None
elif len(outFiles) == 1:
# One OutFile overlaps that period
return outFiles[0]
else:
overlap_windows = [abs(i.segment_list & currsegment_list) \
for i in outFiles]
# Return the File with the biggest overlap
# Note if two File have identical overlap, this will return
# the first File in the list
overlap_windows = numpy.array(overlap_windows, dtype = int)
return outFiles[overlap_windows.argmax()]
[docs] def find_all_output_in_range(self, ifo, currSeg, useSplitLists=False):
"""
Return all files that overlap the specified segment.
"""
if not useSplitLists:
# Slower, but simpler method
outFiles = [i for i in self if ifo in i.ifo_list]
outFiles = [i for i in outFiles
if i.segment_list.intersects_segment(currSeg)]
else:
# Faster, but more complicated
# Basically only check if a subset of files intersects_segment by
# using a presorted list. Sorting only happens once.
if not self._check_split_list_validity():
# FIXME: DO NOT hard code this.
self._temporal_split_list(100)
startIdx = int((currSeg[0] - self._splitListsStart) /
self._splitListsStep)
# Add some small rounding here
endIdx = (currSeg[1] - self._splitListsStart) / self._splitListsStep
endIdx = int(endIdx - 0.000001)
outFiles = []
for idx in range(startIdx, endIdx + 1):
if idx < 0 or idx >= self._splitListsNum:
continue
outFilesTemp = [i for i in self._splitLists[idx]
if ifo in i.ifo_list]
outFiles.extend([i for i in outFilesTemp
if i.segment_list.intersects_segment(currSeg)])
# Remove duplicates
outFiles = list(set(outFiles))
return self.__class__(outFiles)
[docs] def find_output_with_tag(self, tag):
"""
Find all files who have tag in self.tags
"""
# Enforce upper case
tag = tag.upper()
return FileList([i for i in self if tag in i.tags])
[docs] def find_output_without_tag(self, tag):
"""
Find all files who do not have tag in self.tags
"""
# Enforce upper case
tag = tag.upper()
return FileList([i for i in self if tag not in i.tags])
[docs] def find_output_with_ifo(self, ifo):
"""
Find all files who have ifo = ifo
"""
# Enforce upper case
ifo = ifo.upper()
return FileList([i for i in self if ifo in i.ifo_list])
[docs] def get_times_covered_by_files(self):
"""
Find the coalesced intersection of the segments of all files in the
list.
"""
times = segments.segmentlist([])
for entry in self:
times.extend(entry.segment_list)
times.coalesce()
return times
[docs] def convert_to_lal_cache(self):
"""
Return all files in this object as a glue.lal.Cache object
"""
lal_cache = gluelal.Cache([])
for entry in self:
try:
lal_cache.append(entry.cache_entry)
except ValueError:
pass
return lal_cache
def _temporal_split_list(self,numSubLists):
"""
This internal function is used to speed the code up in cases where a
number of operations are being made to determine if files overlap a
specific time. Normally such operations are done on *all* entries with
*every* call. However, if we predetermine which files are at which
times, we can avoid testing *every* file every time.
We therefore create numSubLists distinct and equal length time windows
equally spaced from the first time entry in the list until the last.
A list is made for each window and files are added to lists which they
overlap.
If the list changes it should be captured and these split lists become
invalid. Currently the testing for this is pretty basic
"""
# Assume segment lists are coalesced!
startTime = float( min([i.segment_list[0][0] for i in self]))
endTime = float( max([i.segment_list[-1][-1] for i in self]))
step = (endTime - startTime) / float(numSubLists)
# Set up storage
self._splitLists = []
for idx in range(numSubLists):
self._splitLists.append(FileList([]))
# Sort the files
for currFile in self:
segExtent = currFile.segment_list.extent()
startIdx = (segExtent[0] - startTime) / step
endIdx = (segExtent[1] - startTime) / step
# Add some small rounding here
startIdx = int(startIdx - 0.001)
endIdx = int(endIdx + 0.001)
if startIdx < 0:
startIdx = 0
if endIdx >= numSubLists:
endIdx = numSubLists - 1
for idx in range(startIdx, endIdx + 1):
self._splitLists[idx].append(currFile)
# Set information needed to detect changes and to be used elsewhere
self._splitListsLength = len(self)
self._splitListsNum = numSubLists
self._splitListsStart = startTime
self._splitListsEnd = endTime
self._splitListsStep = step
self._splitListsSet = True
def _check_split_list_validity(self):
"""
See _temporal_split_list above. This function checks if the current
split lists are still valid.
"""
# FIXME: Currently very primitive, but needs to be fast
if not (hasattr(self,"_splitListsSet") and (self._splitListsSet)):
return False
elif len(self) != self._splitListsLength:
return False
else:
return True
[docs] @classmethod
def load(cls, filename):
"""
Load a FileList from a pickle file
"""
f = open(filename, 'r')
return pickle.load(f)
[docs] def dump(self, filename):
"""
Output this FileList to a pickle file
"""
f = open(filename, 'w')
pickle.dump(self, f)
[docs] def to_file_object(self, name, out_dir):
"""Dump to a pickle file and return an File object reference
Parameters
----------
name : str
An identifier of this file. Needs to be unique.
out_dir : path
path to place this file
Returns
-------
file : AhopeFile
"""
make_analysis_dir(out_dir)
file_ref = File('ALL', name, self.get_times_covered_by_files(),
extension='.pkl', directory=out_dir)
self.dump(file_ref.storage_path)
return file_ref
[docs]class SegFile(File):
'''
This class inherits from the File class, and is designed to store
workflow output files containing a segment dict. This is identical in
usage to File except for an additional kwarg for holding the
segment dictionary, if it is known at workflow run time.
'''
def __init__(self, ifo_list, description, valid_segment,
segment_dict=None, seg_summ_dict=None, **kwargs):
"""
See File.__init__ for a full set of documentation for how to
call this class. The only thing unique and added to this class is
the optional segment_dict. NOTE that while segment_dict is a
ligo.segments.segmentlistdict rather than the usual dict[ifo]
we key by dict[ifo:name].
Parameters
------------
ifo_list : string or list (required)
See File.__init__
description : string (required)
See File.__init__
segment : ligo.segments.segment or ligo.segments.segmentlist
See File.__init__
segment_dict : ligo.segments.segmentlistdict (optional, default=None)
A ligo.segments.segmentlistdict covering the times covered by the
segmentlistdict associated with this file.
Can be added by setting self.segment_dict after initializing an
instance of the class.
"""
super(SegFile, self).__init__(ifo_list, description, valid_segment,
**kwargs)
# To avoid confusion with the segment_list property of the parent class
# we refer to this as valid_segments here
self.valid_segments = self.segment_list
self.segment_dict = segment_dict
self.seg_summ_dict = seg_summ_dict
[docs] @classmethod
def from_segment_list(cls, description, segmentlist, name, ifo,
seg_summ_list=None, **kwargs):
""" Initialize a SegFile object from a segmentlist.
Parameters
------------
description : string (required)
See File.__init__
segmentlist : ligo.segments.segmentslist
The segment list that will be stored in this file.
name : str
The name of the segment lists to be stored in the file.
ifo : str
The ifo of the segment lists to be stored in this file.
seg_summ_list : ligo.segments.segmentslist (OPTIONAL)
Specify the segment_summary segmentlist that goes along with the
segmentlist. Default=None, in this case segment_summary is taken
from the valid_segment of the SegFile class.
"""
seglistdict = segments.segmentlistdict()
seglistdict[ifo + ':' + name] = segmentlist
seg_summ_dict = None
if seg_summ_list is not None:
seg_summ_dict = segments.segmentlistdict()
seg_summ_dict[ifo + ':' + name] = seg_summ_list
return cls.from_segment_list_dict(description, seglistdict,
seg_summ_dict=seg_summ_dict, **kwargs)
[docs] @classmethod
def from_multi_segment_list(cls, description, segmentlists, names, ifos,
seg_summ_lists=None, **kwargs):
""" Initialize a SegFile object from a list of segmentlists.
Parameters
------------
description : string (required)
See File.__init__
segmentlists : List of ligo.segments.segmentslist
List of segment lists that will be stored in this file.
names : List of str
List of names of the segment lists to be stored in the file.
ifos : str
List of ifos of the segment lists to be stored in this file.
seg_summ_lists : ligo.segments.segmentslist (OPTIONAL)
Specify the segment_summary segmentlists that go along with the
segmentlists. Default=None, in this case segment_summary is taken
from the valid_segment of the SegFile class.
"""
seglistdict = segments.segmentlistdict()
for name, ifo, segmentlist in zip(names, ifos, segmentlists):
seglistdict[ifo + ':' + name] = segmentlist
if seg_summ_lists is not None:
seg_summ_dict = segments.segmentlistdict()
for name, ifo, seg_summ_list in zip(names, ifos, seg_summ_lists):
seg_summ_dict[ifo + ':' + name] = seg_summ_list
else:
seg_summ_dict = None
return cls.from_segment_list_dict(description, seglistdict,
seg_summ_dict=seg_summ_dict, **kwargs)
[docs] @classmethod
def from_segment_list_dict(cls, description, segmentlistdict,
ifo_list=None, valid_segment=None,
file_exists=False, seg_summ_dict=None,
**kwargs):
""" Initialize a SegFile object from a segmentlistdict.
Parameters
------------
description : string (required)
See File.__init__
segmentlistdict : ligo.segments.segmentslistdict
See SegFile.__init__
ifo_list : string or list (optional)
See File.__init__, if not given a list of all ifos in the
segmentlistdict object will be used
valid_segment : ligo.segments.segment or ligo.segments.segmentlist
See File.__init__, if not given the extent of all segments in the
segmentlistdict is used.
file_exists : boolean (default = False)
If provided and set to True it is assumed that this file already
exists on disk and so there is no need to write again.
seg_summ_dict : ligo.segments.segmentslistdict
Optional. See SegFile.__init__.
"""
if ifo_list is None:
ifo_set = set([i.split(':')[0] for i in segmentlistdict.keys()])
ifo_list = list(ifo_set)
ifo_list.sort()
if valid_segment is None:
if seg_summ_dict and \
numpy.any([len(v) for _, v in seg_summ_dict.items()]):
# Only come here if seg_summ_dict is supplied and it is
# not empty.
valid_segment = seg_summ_dict.extent_all()
else:
try:
valid_segment = segmentlistdict.extent_all()
except:
# Numpty probably didn't supply a
# ligo.segments.segmentlistdict
segmentlistdict=segments.segmentlistdict(segmentlistdict)
try:
valid_segment = segmentlistdict.extent_all()
except ValueError:
# No segment_summary and segment list is empty
# Setting valid segment now is hard!
warn_msg = "No information with which to set valid "
warn_msg += "segment."
logging.warn(warn_msg)
valid_segment = segments.segment([0,1])
instnc = cls(ifo_list, description, valid_segment,
segment_dict=segmentlistdict, seg_summ_dict=seg_summ_dict,
**kwargs)
if not file_exists:
instnc.to_segment_xml()
else:
instnc.add_pfn(urljoin('file:', pathname2url(instnc.storage_path)),
site='local')
return instnc
[docs] @classmethod
def from_segment_xml(cls, xml_file, **kwargs):
"""
Read a ligo.segments.segmentlist from the file object file containing an
xml segment table.
Parameters
-----------
xml_file : file object
file object for segment xml file
"""
# load xmldocument and SegmentDefTable and SegmentTables
fp = open(xml_file, 'rb')
xmldoc = ligolw_utils.load_fileobj(
fp, compress='auto', contenthandler=LIGOLWContentHandler)
seg_def_table = lsctables.SegmentDefTable.get_table(xmldoc)
seg_table = lsctables.SegmentTable.get_table(xmldoc)
seg_sum_table = lsctables.SegmentSumTable.get_table(xmldoc)
segs = segments.segmentlistdict()
seg_summ = segments.segmentlistdict()
seg_id = {}
for seg_def in seg_def_table:
# Here we want to encode ifo and segment name
full_channel_name = ':'.join([str(seg_def.ifos),
str(seg_def.name)])
seg_id[int(seg_def.segment_def_id)] = full_channel_name
segs[full_channel_name] = segments.segmentlist()
seg_summ[full_channel_name] = segments.segmentlist()
for seg in seg_table:
seg_obj = segments.segment(
lal.LIGOTimeGPS(seg.start_time, seg.start_time_ns),
lal.LIGOTimeGPS(seg.end_time, seg.end_time_ns))
segs[seg_id[int(seg.segment_def_id)]].append(seg_obj)
for seg in seg_sum_table:
seg_obj = segments.segment(
lal.LIGOTimeGPS(seg.start_time, seg.start_time_ns),
lal.LIGOTimeGPS(seg.end_time, seg.end_time_ns))
seg_summ[seg_id[int(seg.segment_def_id)]].append(seg_obj)
for seg_name in seg_id.values():
segs[seg_name] = segs[seg_name].coalesce()
xmldoc.unlink()
fp.close()
curr_url = urllib.parse.urlunparse(['file', 'localhost', xml_file,
None, None, None])
return cls.from_segment_list_dict('SEGMENTS', segs, file_url=curr_url,
file_exists=True,
seg_summ_dict=seg_summ, **kwargs)
[docs] def remove_short_sci_segs(self, minSegLength):
"""
Function to remove all science segments
shorter than a specific length. Also updates the file on disk to remove
these segments.
Parameters
-----------
minSegLength : int
Maximum length of science segments. Segments shorter than this will
be removed.
"""
newsegment_list = segments.segmentlist()
for key, seglist in self.segment_dict.items():
newsegment_list = segments.segmentlist()
for seg in seglist:
if abs(seg) > minSegLength:
newsegment_list.append(seg)
newsegment_list.coalesce()
self.segment_dict[key] = newsegment_list
self.to_segment_xml(override_file_if_exists=True)
[docs] def return_union_seglist(self):
return self.segment_dict.union(self.segment_dict.keys())
[docs] def parse_segdict_key(self, key):
"""
Return ifo and name from the segdict key.
"""
splt = key.split(':')
if len(splt) == 2:
return splt[0], splt[1]
else:
err_msg = "Key should be of the format 'ifo:name', got %s." %(key,)
raise ValueError(err_msg)
[docs] def to_segment_xml(self, override_file_if_exists=False):
"""
Write the segment list in self.segmentList to self.storage_path.
"""
# create XML doc and add process table
outdoc = ligolw.Document()
outdoc.appendChild(ligolw.LIGO_LW())
process = create_process_table(outdoc)
for key, seglist in self.segment_dict.items():
ifo, name = self.parse_segdict_key(key)
# Ensure we have LIGOTimeGPS
fsegs = [(lal.LIGOTimeGPS(seg[0]),
lal.LIGOTimeGPS(seg[1])) for seg in seglist]
if self.seg_summ_dict is None:
vsegs = [(lal.LIGOTimeGPS(seg[0]),
lal.LIGOTimeGPS(seg[1])) \
for seg in self.valid_segments]
else:
vsegs = [(lal.LIGOTimeGPS(seg[0]),
lal.LIGOTimeGPS(seg[1])) \
for seg in self.seg_summ_dict[key]]
# Add using glue library to set all segment tables
with ligolw_segments.LigolwSegments(outdoc, process) as x:
x.add(ligolw_segments.LigolwSegmentList(active=fsegs,
instruments=set([ifo]), name=name,
version=1, valid=vsegs))
# write file
url = urljoin('file:', pathname2url(self.storage_path))
if not override_file_if_exists or not self.has_pfn(url, site='local'):
self.add_pfn(url, site='local')
ligolw_utils.write_filename(outdoc, self.storage_path)
[docs]def make_external_call(cmdList, out_dir=None, out_basename='external_call',
shell=False, fail_on_error=True):
"""
Use this to make an external call using the python subprocess module.
See the subprocess documentation for more details of how this works.
http://docs.python.org/2/library/subprocess.html
Parameters
-----------
cmdList : list of strings
This list of strings contains the command to be run. See the subprocess
documentation for more details.
out_dir : string
If given the stdout and stderr will be redirected to
os.path.join(out_dir,out_basename+[".err",".out])
If not given the stdout and stderr will not be recorded
out_basename : string
The value of out_basename used to construct the file names used to
store stderr and stdout. See out_dir for more information.
shell : boolean, default=False
This value will be given as the shell kwarg to the subprocess call.
**WARNING** See the subprocess documentation for details on this
Kwarg including a warning about a serious security exploit. Do not
use this unless you are sure it is necessary **and** safe.
fail_on_error : boolean, default=True
If set to true an exception will be raised if the external command does
not return a code of 0. If set to false such failures will be ignored.
Stderr and Stdout can be stored in either case using the out_dir
and out_basename options.
Returns
--------
exitCode : int
The code returned by the process.
"""
if out_dir:
outBase = os.path.join(out_dir,out_basename)
errFile = outBase + '.err'
errFP = open(errFile, 'w')
outFile = outBase + '.out'
outFP = open(outFile, 'w')
cmdFile = outBase + '.sh'
cmdFP = open(cmdFile, 'w')
cmdFP.write(' '.join(cmdList))
cmdFP.close()
else:
errFile = None
outFile = None
cmdFile = None
errFP = None
outFP = None
msg = "Making external call %s" %(' '.join(cmdList))
logging.info(msg)
errCode = subprocess.call(cmdList, stderr=errFP, stdout=outFP,\
shell=shell)
if errFP:
errFP.close()
if outFP:
outFP.close()
if errCode and fail_on_error:
raise CalledProcessErrorMod(errCode, ' '.join(cmdList),
errFile=errFile, outFile=outFile, cmdFile=cmdFile)
logging.info("Call successful, or error checking disabled.")
[docs]class CalledProcessErrorMod(Exception):
"""
This exception is raised when subprocess.call returns a non-zero exit code
and checking has been requested. This should not be accessed by the user
it is used only within make_external_call.
"""
def __init__(self, returncode, cmd, errFile=None, outFile=None,
cmdFile=None):
self.returncode = returncode
self.cmd = cmd
self.errFile = errFile
self.outFile = outFile
self.cmdFile = cmdFile
def __str__(self):
msg = "Command '%s' returned non-zero exit status %d.\n" \
%(self.cmd, self.returncode)
if self.errFile:
msg += "Stderr can be found in %s .\n" %(self.errFile)
if self.outFile:
msg += "Stdout can be found in %s .\n" %(self.outFile)
if self.cmdFile:
msg += "The failed command has been printed in %s ." %(self.cmdFile)
return msg
[docs]def resolve_url_to_file(curr_pfn, attrs=None):
"""
Resolves a PFN into a workflow.File object.
This function will resolve a PFN to a workflow.File object. If a File
object already exists for that PFN that will be returned, otherwise a new
object is returned. We will implement default site schemes here as needed,
for example cvfms paths will be added to the osg and nonfsio sites in
addition to local. If the LFN is a duplicate of an existing one, but with a
different PFN an AssertionError is raised. The attrs keyword-argument can
be used to specify attributes of a file. All files have 4 possible
attributes. A list of ifos, an identifying string - usually used to give
the name of the executable that created the file, a segmentlist over which
the file is valid and tags specifying particular details about those files.
If attrs['ifos'] is set it will be used as the ifos, otherwise this will
default to ['H1', 'K1', 'L1', 'V1']. If attrs['exe_name'] is given this
will replace the "exe_name" sent to File.__init__ otherwise 'INPUT' will
be given. segs will default to [[1,2000000000]] unless overridden with
attrs['segs']. tags will default to an empty list unless overriden
with attrs['tag']. If attrs is None it will be ignored and all defaults
will be used. It is emphasized that these attributes are for the most part
not important with input files. Exceptions include things like input
template banks, where ifos and valid times will be checked in the workflow
and used in the naming of child job output files.
"""
cvmfsstr1 = 'file:///cvmfs/'
cvmfsstr2 = 'file://localhost/cvmfs/'
osdfstr1 = 'osdf:///' # Technically this isn't CVMFS, but same handling!
cvmfsstrs = (cvmfsstr1, cvmfsstr2, osdfstr1)
# Get LFN
urlp = urllib.parse.urlparse(curr_pfn)
curr_lfn = os.path.basename(urlp.path)
# Does this already exist as a File?
if curr_lfn in file_input_from_config_dict.keys():
file_pfn = file_input_from_config_dict[curr_lfn][2]
# If the PFNs are different, but LFNs are the same then fail.
assert(file_pfn == curr_pfn)
curr_file = file_input_from_config_dict[curr_lfn][1]
else:
# Use resolve_url to download file/symlink as appropriate
local_file_path = resolve_url(curr_pfn)
# Create File object with default local path
curr_file = File.from_path(local_file_path, attrs=attrs)
if curr_pfn.startswith(cvmfsstrs):
# Add PFNs for nonlocal sites for special cases (e.g. CVMFS).
# This block could be extended as needed
curr_file.add_pfn(curr_pfn, site='all')
else:
pfn_local = urljoin('file:', pathname2url(local_file_path))
curr_file.add_pfn(pfn_local, 'local')
# Store the file to avoid later duplication
tuple_val = (local_file_path, curr_file, curr_pfn)
file_input_from_config_dict[curr_lfn] = tuple_val
return curr_file
[docs]def configparser_value_to_file(cp, sec, opt, attrs=None):
"""
Fetch a file given its url location via the section
and option in the workflow configuration parser.
Parameters
-----------
cp : ConfigParser object
The ConfigParser object holding the workflow configuration settings
sec : string
The section containing options for this job.
opt : string
Name of option (e.g. --output-file)
attrs : list to specify the 4 attributes of the file.
Returns
--------
fileobj_from_path : workflow.File object obtained from the path
specified by opt, within sec, in cp.
"""
path = cp.get(sec, opt)
fileobj_from_path = resolve_url_to_file(path, attrs=attrs)
return fileobj_from_path
[docs]def get_full_analysis_chunk(science_segs):
"""
Function to find the first and last time point contained in the science segments
and return a single segment spanning that full time.
Parameters
-----------
science_segs : ifo-keyed dictionary of ligo.segments.segmentlist instances
The list of times that are being analysed in this workflow.
Returns
--------
fullSegment : ligo.segments.segment
The segment spanning the first and last time point contained in science_segs.
"""
extents = [science_segs[ifo].extent() for ifo in science_segs.keys()]
min, max = extents[0]
for lo, hi in extents:
if min > lo:
min = lo
if max < hi:
max = hi
fullSegment = segments.segment(min, max)
return fullSegment
[docs]def get_random_label():
"""
Get a random label string to use when clustering jobs.
"""
return ''.join(random.choice(string.ascii_uppercase + string.digits) \
for _ in range(15))
[docs]def resolve_td_option(val_str, valid_seg):
"""
Take an option which might be time-dependent and resolve it
Some options might take different values depending on the GPS time. For
example if you want opt_1 to take value_a if the time is between 10 and
100, value_b if between 100 and 250, and value_c if between 250 and 500 you
can supply:
value_a[10:100],value_b[100:250],value_c[250:500].
This function will parse that string (as opt) and return the value fully
contained in valid_seg. If valid_seg is not full contained in one, and only
one, of these options. The code will fail. If given a simple option like:
value_a
The function will just return value_a.
"""
# Track if we've already found a matching option
output = ''
# Strip any whitespace, and split on comma
curr_vals = val_str.replace(' ', '').strip().split(',')
# Resolving the simple case is trivial and can be done immediately.
if len(curr_vals) == 1 and '[' not in curr_vals[0]:
return curr_vals[0]
# Loop over all possible values
for cval in curr_vals:
start = int(valid_seg[0])
end = int(valid_seg[1])
# Extract limits for each case, and check overlap with valid_seg
if '[' in cval:
bopt = cval.split('[')[1].split(']')[0]
start, end = bopt.split(':')
cval = cval.replace('[' + bopt + ']', '')
curr_seg = segments.segment(int(start), int(end))
# The segments module is a bit weird so we need to check if the two
# overlap using the following code. If valid_seg is fully within
# curr_seg this will be true.
if curr_seg.intersects(valid_seg) and \
(curr_seg & valid_seg == valid_seg):
if output:
err_msg = "Time-dependent options must be disjoint."
raise ValueError(err_msg)
output = cval
if not output:
err_msg = "Could not resolve option {}".format(val_str)
raise ValueError
return output
[docs]def add_workflow_settings_cli(parser, include_subdax_opts=False):
"""Adds workflow options to an argument parser.
Parameters
----------
parser : argparse.ArgumentParser
Argument parser to add the options to.
include_subdax_opts : bool, optional
If True, will add output-map and dax-file-directory options
to the parser. These can be used for workflows that are
generated as a subdax of another workflow. Default is False.
"""
wfgrp = parser.add_argument_group("Options for setting workflow files")
wfgrp.add_argument("--workflow-name", required=True,
help="Name of the workflow.")
wfgrp.add_argument("--tags", nargs="+", default=[],
help="Append the given tags to file names.")
wfgrp.add_argument("--output-dir", default=None,
help="Path to directory where the workflow will be "
"written. Default is to use "
"{workflow-name}_output.")
wfgrp.add_argument("--cache-file", default=None,
help="Path to input file containing list of files to "
"be reused (the 'input_map' file)")
wfgrp.add_argument("--plan-now", default=False, action='store_true',
help="If given, workflow will immediately be planned "
"on completion of workflow generation but not "
"submitted to the condor pool. A start script "
"will be created to submit to condor.")
wfgrp.add_argument("--submit-now", default=False, action='store_true',
help="If given, workflow will immediately be submitted "
"on completion of workflow generation")
wfgrp.add_argument("--dax-file", default=None,
help="Path to DAX file. Default is to write to the "
"output directory with name "
"{workflow-name}.dax.")
if include_subdax_opts:
wfgrp.add_argument("--output-map", default=None,
help="Path to an output map file.")
wfgrp.add_argument("--dax-file-directory", default=None,
help="Put dax files (including output map, "
"sites.yml etc. in this directory. The use "
"case for this is when running a sub-workflow "
"under pegasus the outputs need to be copied "
"back to the appropriate directory, and "
"using this as --dax-file-directory . allows "
"that to be done.")