# Copyright (C) 2013 Ian Harry
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
This module is responsible for querying a datafind server to determine the
availability of the data that the code is attempting to run on. It also
performs a number of tests and can act on these as described below. Full
documentation for this function can be found here:
https://ldas-jobs.ligo.caltech.edu/~cbc/docs/pycbc/ahope/datafind.html
"""
import os, copy
import logging
import urllib.parse
from ligo import segments
from ligo.lw import utils, table
from gwdatafind import find_urls as find_frame_urls
from pycbc.workflow.core import SegFile, File, FileList, make_analysis_dir
from pycbc.io.ligolw import LIGOLWContentHandler
# NOTE urllib is weird. For some reason it only allows known schemes and will
# give *wrong* results, rather then failing, if you use something like gsiftp
# We can add schemes explicitly, as below, but be careful with this!
# (urllib is used indirectly through lal.Cache objects)
urllib.parse.uses_relative.append('osdf')
urllib.parse.uses_netloc.append('osdf')
logger = logging.getLogger('pycbc.workflow.datafind')
[docs]
def setup_datafind_workflow(workflow, scienceSegs, outputDir, seg_file=None,
tags=None):
"""
Setup datafind section of the workflow. This section is responsible for
generating, or setting up the workflow to generate, a list of files that
record the location of the frame files needed to perform the analysis.
There could be multiple options here, the datafind jobs could be done at
run time or could be put into a dag. The subsequent jobs will know
what was done here from the OutFileList containing the datafind jobs
(and the Dagman nodes if appropriate.
For now the only implemented option is to generate the datafind files at
runtime. This module can also check if the frameFiles actually exist, check
whether the obtained segments line up with the original ones and update the
science segments to reflect missing data files.
Parameters
----------
workflow: pycbc.workflow.core.Workflow
The workflow class that stores the jobs that will be run.
scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances
This contains the times that the workflow is expected to analyse.
outputDir : path
All output files written by datafind processes will be written to this
directory.
seg_file : SegFile, optional (default=None)
The file returned by get_science_segments containing the science
segments and the associated segment_summary. This will
be used for the segment_summary test and is required if, and only if,
performing that test.
tags : list of string, optional (default=None)
Use this to specify tags. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
Returns
--------
datafindOuts : OutGroupList
List of all the datafind output files for use later in the pipeline.
sci_avlble_file : SegFile
SegFile containing the analysable time after checks in the datafind
module are applied to the input segment list. For production runs this
is expected to be equal to the input segment list.
scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances
This contains the times that the workflow is expected to analyse. If
the updateSegmentTimes kwarg is given this will be updated to reflect
any instances of missing data.
sci_avlble_name : string
The name with which the analysable time is stored in the
sci_avlble_file.
"""
if tags is None:
tags = []
logger.info("Entering datafind module")
make_analysis_dir(outputDir)
cp = workflow.cp
# Parse for options in ini file
datafind_method = cp.get_opt_tags("workflow-datafind",
"datafind-method", tags)
if cp.has_option_tags("workflow-datafind",
"datafind-check-segment-gaps", tags):
checkSegmentGaps = cp.get_opt_tags("workflow-datafind",
"datafind-check-segment-gaps", tags)
else:
checkSegmentGaps = "no_test"
if cp.has_option_tags("workflow-datafind",
"datafind-check-frames-exist", tags):
checkFramesExist = cp.get_opt_tags("workflow-datafind",
"datafind-check-frames-exist", tags)
else:
checkFramesExist = "no_test"
if cp.has_option_tags("workflow-datafind",
"datafind-check-segment-summary", tags):
checkSegmentSummary = cp.get_opt_tags("workflow-datafind",
"datafind-check-segment-summary", tags)
else:
checkSegmentSummary = "no_test"
logger.info("Starting datafind with setup_datafind_runtime_generated")
if datafind_method == "AT_RUNTIME_MULTIPLE_CACHES":
datafindcaches, datafindouts = \
setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs,
outputDir, tags=tags)
elif datafind_method == "AT_RUNTIME_SINGLE_CACHES":
datafindcaches, datafindouts = \
setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs,
outputDir, tags=tags)
elif datafind_method == "AT_RUNTIME_MULTIPLE_FRAMES":
datafindcaches, datafindouts = \
setup_datafind_runtime_frames_multi_calls_perifo(cp, scienceSegs,
outputDir, tags=tags)
elif datafind_method == "AT_RUNTIME_SINGLE_FRAMES":
datafindcaches, datafindouts = \
setup_datafind_runtime_frames_single_call_perifo(cp, scienceSegs,
outputDir, tags=tags)
elif datafind_method == "AT_RUNTIME_FAKE_DATA":
pass
elif datafind_method == "FROM_PREGENERATED_LCF_FILES":
ifos = scienceSegs.keys()
datafindcaches, datafindouts = \
setup_datafind_from_pregenerated_lcf_files(cp, ifos,
outputDir, tags=tags)
else:
msg = """Entry datafind-method in [workflow-datafind] does not have "
expected value. Valid values are
AT_RUNTIME_MULTIPLE_FRAMES, AT_RUNTIME_SINGLE_FRAMES
AT_RUNTIME_MULTIPLE_CACHES, AT_RUNTIME_SINGLE_CACHES,
FROM_PREGENERATED_LCF_FILES, or AT_RUNTIME_FAKE_DATA.
Consult the documentation for more info."""
raise ValueError(msg)
using_backup_server = False
if datafind_method == "AT_RUNTIME_MULTIPLE_FRAMES" or \
datafind_method == "AT_RUNTIME_SINGLE_FRAMES":
if cp.has_option_tags("workflow-datafind",
"datafind-backup-datafind-server", tags):
using_backup_server = True
backup_server = cp.get_opt_tags("workflow-datafind",
"datafind-backup-datafind-server", tags)
cp_new = copy.deepcopy(cp)
cp_new.set("workflow-datafind",
"datafind-ligo-datafind-server", backup_server)
cp_new.set('datafind', 'urltype', 'gsiftp')
backup_datafindcaches, backup_datafindouts =\
setup_datafind_runtime_frames_single_call_perifo(cp_new,
scienceSegs, outputDir, tags=tags)
backup_datafindouts = datafind_keep_unique_backups(\
backup_datafindouts, datafindouts)
datafindcaches.extend(backup_datafindcaches)
datafindouts.extend(backup_datafindouts)
logger.info("setup_datafind_runtime_generated completed")
# If we don't have frame files covering all times we can update the science
# segments.
if checkSegmentGaps in ['warn','update_times','raise_error']:
logger.info("Checking science segments against datafind output....")
newScienceSegs = get_science_segs_from_datafind_outs(datafindcaches)
logger.info("New segments calculated from data find output.....")
missingData = False
for ifo in scienceSegs.keys():
# If no science segments in input then do nothing
if not scienceSegs[ifo]:
msg = "No science segments are present for ifo %s, " %(ifo)
msg += "the segment metadata indicates there is no analyzable"
msg += " strain data between the selected GPS start and end "
msg += "times."
logger.warning(msg)
continue
if ifo not in newScienceSegs:
msg = "No data frames were found corresponding to the science "
msg += "segments for ifo %s" %(ifo)
logger.error(msg)
missingData = True
if checkSegmentGaps == 'update_times':
scienceSegs[ifo] = segments.segmentlist()
continue
missing = scienceSegs[ifo] - newScienceSegs[ifo]
if abs(missing):
msg = "From ifo %s we are missing frames covering:" %(ifo)
msg += "\n%s" % "\n".join(map(str, missing))
missingData = True
logger.error(msg)
if checkSegmentGaps == 'update_times':
# Remove missing time, so that we can carry on if desired
logger.info("Updating science segments for ifo %s.", ifo)
scienceSegs[ifo] = scienceSegs[ifo] - missing
if checkSegmentGaps == 'raise_error' and missingData:
raise ValueError("Workflow cannot find needed data, exiting.")
logger.info("Done checking, any discrepancies are reported above.")
elif checkSegmentGaps == 'no_test':
pass
else:
errMsg = "checkSegmentGaps kwarg must take a value from 'no_test', "
errMsg += "'warn', 'update_times' or 'raise_error'."
raise ValueError(errMsg)
# Do all of the frame files that were returned actually exist?
if checkFramesExist in ['warn','update_times','raise_error']:
logger.info("Verifying that all frames exist on disk.")
missingFrSegs, missingFrames = \
get_missing_segs_from_frame_file_cache(datafindcaches)
missingFlag = False
for ifo in missingFrames.keys():
# If no data in the input then do nothing
if not scienceSegs[ifo]:
continue
# If using a backup server, does the frame exist remotely?
if using_backup_server:
# WARNING: This will be slow, but hopefully it will not occur
# for too many frames. This could be optimized if
# it becomes necessary.
new_list = []
for frame in missingFrames[ifo]:
for dfout in datafindouts:
dfout_pfns = list(dfout.pfns)
dfout_urls = [a.url for a in dfout_pfns]
if frame.url in dfout_urls:
pfn = dfout_pfns[dfout_urls.index(frame.url)]
dfout.removePFN(pfn)
if len(dfout.pfns) == 0:
new_list.append(frame)
else:
msg = "Frame %s not found locally. "\
%(frame.url,)
msg += "Replacing with remote url(s) %s." \
%(str([a.url for a in dfout.pfns]),)
logger.info(msg)
break
else:
new_list.append(frame)
missingFrames[ifo] = new_list
if missingFrames[ifo]:
msg = "From ifo %s we are missing the following frames:" %(ifo)
msg +='\n'.join([a.url for a in missingFrames[ifo]])
missingFlag = True
logger.error(msg)
if checkFramesExist == 'update_times':
# Remove missing times, so that we can carry on if desired
logger.info("Updating science times for ifo %s.", ifo)
scienceSegs[ifo] = scienceSegs[ifo] - missingFrSegs[ifo]
if checkFramesExist == 'raise_error' and missingFlag:
raise ValueError("Workflow cannot find all frames, exiting.")
logger.info("Finished checking frames.")
elif checkFramesExist == 'no_test':
pass
else:
errMsg = "checkFramesExist kwarg must take a value from 'no_test', "
errMsg += "'warn', 'update_times' or 'raise_error'."
raise ValueError(errMsg)
# Check if there are cases where frames exist, but no entry in the segment
# summary table are present.
if checkSegmentSummary in ['warn', 'raise_error']:
logger.info("Checking the segment summary table against frames.")
dfScienceSegs = get_science_segs_from_datafind_outs(datafindcaches)
missingFlag = False
# NOTE: Should this be overrideable in the config file?
sci_seg_name = "SCIENCE"
if seg_file is None:
err_msg = "You must provide the science segments SegFile object "
err_msg += "if using the datafind-check-segment-summary option."
raise ValueError(err_msg)
if seg_file.seg_summ_dict is None:
err_msg = "The provided science segments SegFile object must "
err_msg += "contain a valid segment_summary table if using the "
err_msg += "datafind-check-segment-summary option."
raise ValueError(err_msg)
seg_summary_times = seg_file.seg_summ_dict
for ifo in dfScienceSegs.keys():
curr_seg_summ_times = seg_summary_times[ifo + ":" + sci_seg_name]
missing = (dfScienceSegs[ifo] & seg_file.valid_segments)
missing.coalesce()
missing = missing - curr_seg_summ_times
missing.coalesce()
scienceButNotFrame = scienceSegs[ifo] - dfScienceSegs[ifo]
scienceButNotFrame.coalesce()
missing2 = scienceSegs[ifo] - scienceButNotFrame
missing2.coalesce()
missing2 = missing2 - curr_seg_summ_times
missing2.coalesce()
if abs(missing):
msg = "From ifo %s the following times have frames, " %(ifo)
msg += "but are not covered in the segment summary table."
msg += "\n%s" % "\n".join(map(str, missing))
logger.error(msg)
missingFlag = True
if abs(missing2):
msg = "From ifo %s the following times have frames, " %(ifo)
msg += "are science, and are not covered in the segment "
msg += "summary table."
msg += "\n%s" % "\n".join(map(str, missing2))
logger.error(msg)
missingFlag = True
if checkSegmentSummary == 'raise_error' and missingFlag:
errMsg = "Segment_summary discrepancy detected, exiting."
raise ValueError(errMsg)
elif checkSegmentSummary == 'no_test':
pass
else:
errMsg = "checkSegmentSummary kwarg must take a value from 'no_test', "
errMsg += "'warn', or 'raise_error'."
raise ValueError(errMsg)
# Now need to create the file for SCIENCE_AVAILABLE
sci_avlble_dict = segments.segmentlistdict()
# NOTE: Should this be overrideable in the config file?
sci_avlble_name = "SCIENCE_AVAILABLE"
for ifo in scienceSegs.keys():
sci_avlble_dict[ifo + ':' + sci_avlble_name] = scienceSegs[ifo]
sci_avlble_file = SegFile.from_segment_list_dict('SCIENCE_AVAILABLE',
sci_avlble_dict, ifo_list = scienceSegs.keys(),
valid_segment=workflow.analysis_time,
extension='.xml', tags=tags, directory=outputDir)
logger.info("Leaving datafind module")
if datafind_method == "AT_RUNTIME_FAKE_DATA":
datafindouts = None
else:
datafindouts = FileList(datafindouts)
return datafindouts, sci_avlble_file, scienceSegs, sci_avlble_name
[docs]
def setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs,
outputDir, tags=None):
"""
This function uses the `gwdatafind` library to obtain the location of all
the frame files that will be needed to cover the analysis of the data
given in scienceSegs. This function will not check if the returned frames
cover the whole time requested, such sanity checks are done in the
pycbc.workflow.setup_datafind_workflow entry function. As opposed to
setup_datafind_runtime_single_call_perifo this call will one call to the
datafind server for every science segment. This function will return a list
of output files that correspond to the cache .lcf files that are produced,
which list the locations of all frame files. This will cause problems with
pegasus, which expects to know about all input files (ie. the frame files
themselves.)
Parameters
-----------
cp : ConfigParser.ConfigParser instance
This contains a representation of the information stored within the
workflow configuration files
scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances
This contains the times that the workflow is expected to analyse.
outputDir : path
All output files written by datafind processes will be written to this
directory.
tags : list of strings, optional (default=None)
Use this to specify tags. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
Returns
--------
datafindcaches : list of glue.lal.Cache instances
The glue.lal.Cache representations of the various calls to the datafind
server and the returned frame files.
datafindOuts : pycbc.workflow.core.FileList
List of all the datafind output files for use later in the pipeline.
"""
if tags is None:
tags = []
# Now ready to loop over the input segments
datafindouts = []
datafindcaches = []
logger.info("Querying datafind server for all science segments.")
for ifo, scienceSegsIfo in scienceSegs.items():
observatory = ifo[0].upper()
frameType = cp.get_opt_tags("workflow-datafind",
"datafind-%s-frame-type" % (ifo.lower()), tags)
for seg in scienceSegsIfo:
msg = "Finding data between %d and %d " %(seg[0],seg[1])
msg += "for ifo %s" %(ifo)
logger.info(msg)
# WARNING: For now the workflow will expect times to be in integer seconds
startTime = int(seg[0])
endTime = int(seg[1])
# Sometimes the connection can drop, so try a backup here
try:
cache, cache_file = run_datafind_instance(
cp,
outputDir,
observatory,
frameType,
startTime,
endTime,
ifo,
tags=tags
)
except:
cache, cache_file = run_datafind_instance(
cp,
outputDir,
observatory,
frameType,
startTime,
endTime,
ifo,
tags=tags
)
datafindouts.append(cache_file)
datafindcaches.append(cache)
return datafindcaches, datafindouts
[docs]
def setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir,
tags=None):
"""
This function uses the `gwdatafind` library to obtain the location of all
the frame files that will be needed to cover the analysis of the data
given in scienceSegs. This function will not check if the returned frames
cover the whole time requested, such sanity checks are done in the
pycbc.workflow.setup_datafind_workflow entry function. As opposed to
setup_datafind_runtime_generated this call will only run one call to
datafind per ifo, spanning the whole time. This function will return a list
of output files that correspond to the cache .lcf files that are produced,
which list the locations of all frame files. This will cause problems with
pegasus, which expects to know about all input files (ie. the frame files
themselves.)
Parameters
-----------
cp : ConfigParser.ConfigParser instance
This contains a representation of the information stored within the
workflow configuration files
scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances
This contains the times that the workflow is expected to analyse.
outputDir : path
All output files written by datafind processes will be written to this
directory.
tags : list of strings, optional (default=None)
Use this to specify tags. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
Returns
--------
datafindcaches : list of glue.lal.Cache instances
The glue.lal.Cache representations of the various calls to the datafind
server and the returned frame files.
datafindOuts : pycbc.workflow.core.FileList
List of all the datafind output files for use later in the pipeline.
"""
if tags is None:
tags = []
# We want to ignore gaps as the detectors go up and down and calling this
# way will give gaps. See the setup_datafind_runtime_generated function
# for datafind calls that only query for data that will exist
cp.set("datafind","on_gaps","ignore")
# Now ready to loop over the input segments
datafindouts = []
datafindcaches = []
logger.info("Querying datafind server for all science segments.")
for ifo, scienceSegsIfo in scienceSegs.items():
observatory = ifo[0].upper()
checked_times = segments.segmentlist([])
frame_types = cp.get_opt_tags(
"workflow-datafind",
"datafind-%s-frame-type" % (ifo.lower()), tags
)
# Check if this is one type, or time varying
frame_types = frame_types.replace(' ', '').strip().split(',')
for ftype in frame_types:
# Check the times, default to full time initially
# This REQUIRES a coalesced segment list to work
start = int(scienceSegsIfo[0][0])
end = int(scienceSegsIfo[-1][1])
# Then check for limits. We're expecting something like:
# value[start:end], so need to extract value, start and end
if '[' in ftype:
# This gets start and end out
bopt = ftype.split('[')[1].split(']')[0]
newstart, newend = bopt.split(':')
# Then check if the times are within science time
start = max(int(newstart), start)
end = min(int(newend), end)
if end <= start:
continue
# This extracts value
ftype = ftype.split('[')[0]
curr_times = segments.segment(start, end)
# The times here must be distinct. We cannot have two different
# frame files at the same time from the same ifo.
if checked_times.intersects_segment(curr_times):
err_msg = "Different frame types cannot overlap in time."
raise ValueError(err_msg)
checked_times.append(curr_times)
# Ask datafind where the frames are
try:
cache, cache_file = run_datafind_instance(
cp,
outputDir,
observatory,
ftype,
start,
end,
ifo,
tags=tags
)
except:
cache, cache_file = run_datafind_instance(
cp,
outputDir,
observatory,
ftype,
start,
end,
ifo,
tags=tags
)
datafindouts.append(cache_file)
datafindcaches.append(cache)
return datafindcaches, datafindouts
[docs]
def setup_datafind_runtime_frames_single_call_perifo(cp, scienceSegs,
outputDir, tags=None):
"""
This function uses the `gwdatafind` library to obtain the location of all
the frame files that will be needed to cover the analysis of the data
given in scienceSegs. This function will not check if the returned frames
cover the whole time requested, such sanity checks are done in the
pycbc.workflow.setup_datafind_workflow entry function. As opposed to
setup_datafind_runtime_generated this call will only run one call to
datafind per ifo, spanning the whole time. This function will return a list
of files corresponding to the individual frames returned by the datafind
query. This will allow pegasus to more easily identify all the files used
as input, but may cause problems for codes that need to take frame cache
files as input.
Parameters
-----------
cp : ConfigParser.ConfigParser instance
This contains a representation of the information stored within the
workflow configuration files
scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances
This contains the times that the workflow is expected to analyse.
outputDir : path
All output files written by datafind processes will be written to this
directory.
tags : list of strings, optional (default=None)
Use this to specify tags. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
Returns
--------
datafindcaches : list of glue.lal.Cache instances
The glue.lal.Cache representations of the various calls to the datafind
server and the returned frame files.
datafindOuts : pycbc.workflow.core.FileList
List of all the datafind output files for use later in the pipeline.
"""
datafindcaches, _ = \
setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs,
outputDir, tags=tags)
datafindouts = convert_cachelist_to_filelist(datafindcaches)
return datafindcaches, datafindouts
[docs]
def setup_datafind_runtime_frames_multi_calls_perifo(cp, scienceSegs,
outputDir, tags=None):
"""
This function uses the `gwdatafind` library to obtain the location of all
the frame files that will be needed to cover the analysis of the data
given in scienceSegs. This function will not check if the returned frames
cover the whole time requested, such sanity checks are done in the
pycbc.workflow.setup_datafind_workflow entry function. As opposed to
setup_datafind_runtime_single_call_perifo this call will one call to the
datafind server for every science segment. This function will return a list
of files corresponding to the individual frames returned by the datafind
query. This will allow pegasus to more easily identify all the files used
as input, but may cause problems for codes that need to take frame cache
files as input.
Parameters
-----------
cp : ConfigParser.ConfigParser instance
This contains a representation of the information stored within the
workflow configuration files
scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances
This contains the times that the workflow is expected to analyse.
outputDir : path
All output files written by datafind processes will be written to this
directory.
tags : list of strings, optional (default=None)
Use this to specify tags. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
FIXME: Filenames may not be unique with current codes!
Returns
--------
datafindcaches : list of glue.lal.Cache instances
The glue.lal.Cache representations of the various calls to the datafind
server and the returned frame files.
datafindOuts : pycbc.workflow.core.FileList
List of all the datafind output files for use later in the pipeline.
"""
datafindcaches, _ = \
setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs,
outputDir, tags=tags)
datafindouts = convert_cachelist_to_filelist(datafindcaches)
return datafindcaches, datafindouts
[docs]
def setup_datafind_from_pregenerated_lcf_files(cp, ifos, outputDir, tags=None):
"""
This function is used if you want to run with pregenerated lcf frame
cache files.
Parameters
-----------
cp : ConfigParser.ConfigParser instance
This contains a representation of the information stored within the
workflow configuration files
ifos : list of ifo strings
List of ifos to get pregenerated files for.
outputDir : path
All output files written by datafind processes will be written to this
directory. Currently this sub-module writes no output.
tags : list of strings, optional (default=None)
Use this to specify tags. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniqueify the actual filename.
Returns
--------
datafindcaches : list of glue.lal.Cache instances
The glue.lal.Cache representations of the various calls to the datafind
server and the returned frame files.
datafindOuts : pycbc.workflow.core.FileList
List of all the datafind output files for use later in the pipeline.
"""
from glue import lal
if tags is None:
tags = []
datafindcaches = []
for ifo in ifos:
search_string = "datafind-pregenerated-cache-file-%s" %(ifo.lower(),)
frame_cache_file_name = cp.get_opt_tags("workflow-datafind",
search_string, tags=tags)
curr_cache = lal.Cache.fromfilenames([frame_cache_file_name],
coltype=lal.LIGOTimeGPS)
curr_cache.ifo = ifo
datafindcaches.append(curr_cache)
datafindouts = convert_cachelist_to_filelist(datafindcaches)
return datafindcaches, datafindouts
[docs]
def convert_cachelist_to_filelist(datafindcache_list):
"""
Take as input a list of glue.lal.Cache objects and return a pycbc FileList
containing all frames within those caches.
Parameters
-----------
datafindcache_list : list of glue.lal.Cache objects
The list of cache files to convert.
Returns
--------
datafind_filelist : FileList of frame File objects
The list of frame files.
"""
prev_file = None
prev_name = None
this_name = None
datafind_filelist = FileList([])
for cache in datafindcache_list:
# sort the cache into time sequential order
cache.sort()
curr_ifo = cache.ifo
for frame in cache:
# Pegasus doesn't like "localhost" in URLs.
frame.url = frame.url.replace('file://localhost', 'file://')
# Not sure why it happens in OSDF URLs!!
# May need to remove use of Cache objects
frame.url = frame.url.replace('osdf://localhost/', 'osdf:///')
# Create one File() object for each unique frame file that we
# get back in the cache.
if prev_file:
prev_name = os.path.basename(prev_file.cache_entry.url)
this_name = os.path.basename(frame.url)
if (prev_file is None) or (prev_name != this_name):
currFile = File(curr_ifo, frame.description,
frame.segment, file_url=frame.url, use_tmp_subdirs=True)
datafind_filelist.append(currFile)
prev_file = currFile
# Populate the PFNs for the File() we just created
cvmfs_urls = ('file:///cvmfs/', 'osdf://')
if frame.url.startswith(cvmfs_urls):
# Frame is on CVMFS/OSDF, so let all sites read it directly.
currFile.add_pfn(frame.url, site='all')
elif frame.url.startswith('file://'):
# Frame not on CVMFS, so may need transferring.
# Be careful here! If all your frames files are on site
# = local and you try to run on OSG, it will likely
# overwhelm the condor file transfer process!
currFile.add_pfn(frame.url, site='local')
else:
# Frame is at some unknown URL. Pegasus will decide how to deal
# with this, but will likely transfer to local site first, and
# from there transfer to remote sites as needed.
currFile.add_pfn(frame.url, site='notlocal')
return datafind_filelist
[docs]
def get_science_segs_from_datafind_outs(datafindcaches):
"""
This function will calculate the science segments that are covered in
the OutGroupList containing the frame files returned by various
calls to the datafind server. This can then be used to check whether this
list covers what it is expected to cover.
Parameters
----------
datafindcaches : OutGroupList
List of all the datafind output files.
Returns
--------
newScienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances
The times covered by the frames found in datafindOuts.
"""
newScienceSegs = {}
for cache in datafindcaches:
if len(cache) > 0:
groupSegs = segments.segmentlist(e.segment for e in cache).coalesce()
ifo = cache.ifo
if ifo not in newScienceSegs:
newScienceSegs[ifo] = groupSegs
else:
newScienceSegs[ifo].extend(groupSegs)
newScienceSegs[ifo].coalesce()
return newScienceSegs
[docs]
def get_missing_segs_from_frame_file_cache(datafindcaches):
"""
This function will use os.path.isfile to determine if all the frame files
returned by the local datafind server actually exist on the disk. This can
then be used to update the science times if needed.
Parameters
-----------
datafindcaches : OutGroupList
List of all the datafind output files.
Returns
--------
missingFrameSegs : Dict. of ifo keyed ligo.segments.segmentlist instances
The times corresponding to missing frames found in datafindOuts.
missingFrames: Dict. of ifo keyed lal.Cache instances
The list of missing frames
"""
from glue import lal
missingFrameSegs = {}
missingFrames = {}
for cache in datafindcaches:
if len(cache) > 0:
# Don't bother if these are not file:// urls, assume all urls in
# one cache file must be the same type
if not cache[0].scheme == 'file':
warn_msg = "We have %s entries in the " %(cache[0].scheme,)
warn_msg += "cache file. I do not check if these exist."
logger.warning(warn_msg)
continue
_, currMissingFrames = cache.checkfilesexist(on_missing="warn")
missingSegs = segments.segmentlist(e.segment \
for e in currMissingFrames).coalesce()
ifo = cache.ifo
if ifo not in missingFrameSegs:
missingFrameSegs[ifo] = missingSegs
missingFrames[ifo] = lal.Cache(currMissingFrames)
else:
missingFrameSegs[ifo].extend(missingSegs)
# NOTE: This .coalesce probably isn't needed as the segments
# should be disjoint. If speed becomes an issue maybe remove it?
missingFrameSegs[ifo].coalesce()
missingFrames[ifo].extend(currMissingFrames)
return missingFrameSegs, missingFrames
[docs]
def get_segment_summary_times(scienceFile, segmentName):
"""
This function will find the times for which the segment_summary is set
for the flag given by segmentName.
Parameters
-----------
scienceFile : SegFile
The segment file that we want to use to determine this.
segmentName : string
The DQ flag to search for times in the segment_summary table.
Returns
---------
summSegList : ligo.segments.segmentlist
The times that are covered in the segment summary table.
"""
# Parse the segmentName
segmentName = segmentName.split(':')
if not len(segmentName) in [2, 3]:
raise ValueError(f"Invalid channel name {segmentName}.")
ifo = segmentName[0]
channel = segmentName[1]
version = ''
if len(segmentName) == 3:
version = int(segmentName[2])
# Load the filename
xmldoc = utils.load_filename(
scienceFile.cache_entry.path,
compress='auto',
contenthandler=LIGOLWContentHandler
)
# Get the segment_def_id for the segmentName
segmentDefTable = table.Table.get_table(xmldoc, "segment_definer")
for entry in segmentDefTable:
if (entry.ifos == ifo) and (entry.name == channel):
if len(segmentName) == 2 or (entry.version==version):
segDefID = entry.segment_def_id
break
else:
raise ValueError("Cannot find channel %s in segment_definer table."\
%(segmentName))
# Get the segmentlist corresponding to this segmentName in segment_summary
segmentSummTable = table.Table.get_table(xmldoc, "segment_summary")
summSegList = segments.segmentlist([])
for entry in segmentSummTable:
if entry.segment_def_id == segDefID:
segment = segments.segment(entry.start_time, entry.end_time)
summSegList.append(segment)
summSegList.coalesce()
return summSegList
[docs]
def run_datafind_instance(cp, outputDir, observatory, frameType,
startTime, endTime, ifo, tags=None):
"""
This function will query the datafind server once to find frames between
the specified times for the specified frame type and observatory.
Parameters
----------
cp : ConfigParser instance
Source for any kwargs that should be sent to the datafind module
outputDir : Output cache files will be written here. We also write the
commands for reproducing what is done in this function to this
directory.
observatory : string
The observatory to query frames for. Ex. 'H', 'L' or 'V'. NB: not
'H1', 'L1', 'V1' which denote interferometers.
frameType : string
The frame type to query for.
startTime : int
Integer start time to query the datafind server for frames.
endTime : int
Integer end time to query the datafind server for frames.
ifo : string
The interferometer to use for naming output. Ex. 'H1', 'L1', 'V1'.
Maybe this could be merged with the observatory string, but this
could cause issues if running on old 'H2' and 'H1' data.
tags : list of string, optional (default=None)
Use this to specify tags. This can be used if this module is being
called more than once to give call specific configuration (by setting
options in [workflow-datafind-${TAG}] rather than [workflow-datafind]).
This is also used to tag the Files returned by the class to uniqueify
the Files and uniquify the actual filename.
FIXME: Filenames may not be unique with current codes!
Returns
--------
dfCache : glue.lal.Cache instance
The glue.lal.Cache representation of the call to the datafind
server and the returned frame files.
cacheFile : pycbc.workflow.core.File
Cache file listing all of the datafind output files for use later in the pipeline.
"""
from glue import lal
if tags is None:
tags = []
# Determine if we should override the default datafind server
if cp.has_option_tags("workflow-datafind",
"datafind-ligo-datafind-server", tags):
datafind_server = cp.get_opt_tags(
"workflow-datafind",
"datafind-ligo-datafind-server",
tags
)
else:
datafind_server = None
seg = segments.segment([startTime, endTime])
# Take the datafind kwargs from config (usually urltype=file is
# given).
dfKwargs = {}
# By default ignore missing frames, this case is dealt with outside of here
dfKwargs['on_gaps'] = 'ignore'
if cp.has_section("datafind"):
for item, value in cp.items("datafind"):
dfKwargs[item] = value
for tag in tags:
if cp.has_section('datafind-%s' %(tag)):
for item, value in cp.items("datafind-%s" %(tag)):
dfKwargs[item] = value
# It is useful to print the corresponding command to the logs
# directory to check if this was expected.
log_datafind_command(observatory, frameType, startTime, endTime,
os.path.join(outputDir,'logs'), **dfKwargs)
logger.debug("Asking datafind server for frames.")
dfCache = lal.Cache.from_urls(
find_frame_urls(
observatory,
frameType,
startTime,
endTime,
host=datafind_server,
**dfKwargs
),
)
logger.debug("Frames returned")
# workflow format output file
cache_file = File(ifo, 'DATAFIND', seg, extension='lcf',
directory=outputDir, tags=tags)
cache_file.add_pfn(cache_file.cache_entry.path, site='local')
dfCache.ifo = ifo
# Dump output to file
fP = open(cache_file.storage_path, "w")
# FIXME: CANNOT use dfCache.tofile because it will print 815901601.00000
# as a gps time which is incompatible with the lal cache format
# (and the C codes) which demand an integer.
#dfCache.tofile(fP)
for entry in dfCache:
start = str(int(entry.segment[0]))
duration = str(int(abs(entry.segment)))
print("%s %s %s %s %s" \
% (entry.observatory, entry.description, start, duration, entry.url), file=fP)
entry.segment = segments.segment(int(entry.segment[0]), int(entry.segment[1]))
fP.close()
return dfCache, cache_file
[docs]
def log_datafind_command(observatory, frameType, startTime, endTime,
outputDir, **dfKwargs):
"""
This command will print an equivalent gw_data_find command to disk that
can be used to debug why the internal datafind module is not working.
"""
# FIXME: This does not accurately reproduce the call as assuming the
# kwargs will be the same is wrong, so some things need to be converted
# "properly" to the command line equivalent.
gw_command = ['gw_data_find', '--observatory', observatory,
'--type', frameType,
'--gps-start-time', str(startTime),
'--gps-end-time', str(endTime)]
for name, value in dfKwargs.items():
if name == 'match':
gw_command.append("--match")
gw_command.append(str(value))
elif name == 'urltype':
gw_command.append("--url-type")
gw_command.append(str(value))
elif name == 'on_gaps':
pass
else:
errMsg = "Unknown datafind kwarg given: %s. " %(name)
errMsg+= "This argument is stripped in the logged .sh command."
logger.warning(errMsg)
fileName = "%s-%s-%d-%d.sh" \
%(observatory, frameType, startTime, endTime-startTime)
filePath = os.path.join(outputDir, fileName)
fP = open(filePath, 'w')
fP.write(' '.join(gw_command))
fP.close()
[docs]
def datafind_keep_unique_backups(backup_outs, orig_outs):
"""This function will take a list of backup datafind files, presumably
obtained by querying a remote datafind server, e.g. CIT, and compares
these against a list of original datafind files, presumably obtained by
querying the local datafind server. Only the datafind files in the backup
list that do not appear in the original list are returned. This allows us
to use only files that are missing from the local cluster.
Parameters
-----------
backup_outs : FileList
List of datafind files from the remote datafind server.
orig_outs : FileList
List of datafind files from the local datafind server.
Returns
--------
FileList
List of datafind files in backup_outs and not in orig_outs.
"""
# NOTE: This function is not optimized and could be made considerably
# quicker if speed becomes in issue. With 4s frame files this might
# be slow, but for >1000s files I don't foresee any issue, so I keep
# this simple.
return_list = FileList([])
# We compare the LFNs to determine uniqueness
# Is there a way to associate two paths with one LFN??
orig_names = [f.name for f in orig_outs]
for file in backup_outs:
if file.name not in orig_names:
return_list.append(file)
else:
index_num = orig_names.index(file.name)
orig_out = orig_outs[index_num]
pfns = list(file.pfns)
# This shouldn't happen, but catch if it does
assert(len(pfns) == 1)
orig_out.add_pfn(pfns[0].url, site='notlocal')
return return_list