Source code for pycbc.workflow.datafind

# Copyright (C) 2013  Ian Harry
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

#
# =============================================================================
#
#                                   Preamble
#
# =============================================================================
#
"""
This module is responsible for querying a datafind server to determine the
availability of the data that the code is attempting to run on. It also
performs a number of tests and can act on these as described below. Full
documentation for this function can be found here:
https://ldas-jobs.ligo.caltech.edu/~cbc/docs/pycbc/ahope/datafind.html
"""

import os, copy
import logging
import urllib.parse

from ligo import segments
from ligo.lw import utils, table
from gwdatafind import find_urls as find_frame_urls

from pycbc.workflow.core import SegFile, File, FileList, make_analysis_dir
from pycbc.io.ligolw import LIGOLWContentHandler

# NOTE urllib is weird. For some reason it only allows known schemes and will
# give *wrong* results, rather then failing, if you use something like gsiftp
# We can add schemes explicitly, as below, but be careful with this!
# (urllib is used indirectly through lal.Cache objects)
urllib.parse.uses_relative.append('osdf')
urllib.parse.uses_netloc.append('osdf')

logger = logging.getLogger('pycbc.workflow.datafind')

[docs]def setup_datafind_workflow(workflow, scienceSegs, outputDir, seg_file=None, tags=None): """ Setup datafind section of the workflow. This section is responsible for generating, or setting up the workflow to generate, a list of files that record the location of the frame files needed to perform the analysis. There could be multiple options here, the datafind jobs could be done at run time or could be put into a dag. The subsequent jobs will know what was done here from the OutFileList containing the datafind jobs (and the Dagman nodes if appropriate. For now the only implemented option is to generate the datafind files at runtime. This module can also check if the frameFiles actually exist, check whether the obtained segments line up with the original ones and update the science segments to reflect missing data files. Parameters ---------- workflow: pycbc.workflow.core.Workflow The workflow class that stores the jobs that will be run. scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances This contains the times that the workflow is expected to analyse. outputDir : path All output files written by datafind processes will be written to this directory. seg_file : SegFile, optional (default=None) The file returned by get_science_segments containing the science segments and the associated segment_summary. This will be used for the segment_summary test and is required if, and only if, performing that test. tags : list of string, optional (default=None) Use this to specify tags. This can be used if this module is being called more than once to give call specific configuration (by setting options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This is also used to tag the Files returned by the class to uniqueify the Files and uniqueify the actual filename. FIXME: Filenames may not be unique with current codes! Returns -------- datafindOuts : OutGroupList List of all the datafind output files for use later in the pipeline. sci_avlble_file : SegFile SegFile containing the analysable time after checks in the datafind module are applied to the input segment list. For production runs this is expected to be equal to the input segment list. scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances This contains the times that the workflow is expected to analyse. If the updateSegmentTimes kwarg is given this will be updated to reflect any instances of missing data. sci_avlble_name : string The name with which the analysable time is stored in the sci_avlble_file. """ if tags is None: tags = [] logger.info("Entering datafind module") make_analysis_dir(outputDir) cp = workflow.cp # Parse for options in ini file datafind_method = cp.get_opt_tags("workflow-datafind", "datafind-method", tags) if cp.has_option_tags("workflow-datafind", "datafind-check-segment-gaps", tags): checkSegmentGaps = cp.get_opt_tags("workflow-datafind", "datafind-check-segment-gaps", tags) else: checkSegmentGaps = "no_test" if cp.has_option_tags("workflow-datafind", "datafind-check-frames-exist", tags): checkFramesExist = cp.get_opt_tags("workflow-datafind", "datafind-check-frames-exist", tags) else: checkFramesExist = "no_test" if cp.has_option_tags("workflow-datafind", "datafind-check-segment-summary", tags): checkSegmentSummary = cp.get_opt_tags("workflow-datafind", "datafind-check-segment-summary", tags) else: checkSegmentSummary = "no_test" logger.info("Starting datafind with setup_datafind_runtime_generated") if datafind_method == "AT_RUNTIME_MULTIPLE_CACHES": datafindcaches, datafindouts = \ setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs, outputDir, tags=tags) elif datafind_method == "AT_RUNTIME_SINGLE_CACHES": datafindcaches, datafindouts = \ setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir, tags=tags) elif datafind_method == "AT_RUNTIME_MULTIPLE_FRAMES": datafindcaches, datafindouts = \ setup_datafind_runtime_frames_multi_calls_perifo(cp, scienceSegs, outputDir, tags=tags) elif datafind_method == "AT_RUNTIME_SINGLE_FRAMES": datafindcaches, datafindouts = \ setup_datafind_runtime_frames_single_call_perifo(cp, scienceSegs, outputDir, tags=tags) elif datafind_method == "AT_RUNTIME_FAKE_DATA": pass elif datafind_method == "FROM_PREGENERATED_LCF_FILES": ifos = scienceSegs.keys() datafindcaches, datafindouts = \ setup_datafind_from_pregenerated_lcf_files(cp, ifos, outputDir, tags=tags) else: msg = """Entry datafind-method in [workflow-datafind] does not have " expected value. Valid values are AT_RUNTIME_MULTIPLE_FRAMES, AT_RUNTIME_SINGLE_FRAMES AT_RUNTIME_MULTIPLE_CACHES, AT_RUNTIME_SINGLE_CACHES, FROM_PREGENERATED_LCF_FILES, or AT_RUNTIME_FAKE_DATA. Consult the documentation for more info.""" raise ValueError(msg) using_backup_server = False if datafind_method == "AT_RUNTIME_MULTIPLE_FRAMES" or \ datafind_method == "AT_RUNTIME_SINGLE_FRAMES": if cp.has_option_tags("workflow-datafind", "datafind-backup-datafind-server", tags): using_backup_server = True backup_server = cp.get_opt_tags("workflow-datafind", "datafind-backup-datafind-server", tags) cp_new = copy.deepcopy(cp) cp_new.set("workflow-datafind", "datafind-ligo-datafind-server", backup_server) cp_new.set('datafind', 'urltype', 'gsiftp') backup_datafindcaches, backup_datafindouts =\ setup_datafind_runtime_frames_single_call_perifo(cp_new, scienceSegs, outputDir, tags=tags) backup_datafindouts = datafind_keep_unique_backups(\ backup_datafindouts, datafindouts) datafindcaches.extend(backup_datafindcaches) datafindouts.extend(backup_datafindouts) logger.info("setup_datafind_runtime_generated completed") # If we don't have frame files covering all times we can update the science # segments. if checkSegmentGaps in ['warn','update_times','raise_error']: logger.info("Checking science segments against datafind output....") newScienceSegs = get_science_segs_from_datafind_outs(datafindcaches) logger.info("New segments calculated from data find output.....") missingData = False for ifo in scienceSegs.keys(): # If no science segments in input then do nothing if not scienceSegs[ifo]: msg = "No science segments are present for ifo %s, " %(ifo) msg += "the segment metadata indicates there is no analyzable" msg += " strain data between the selected GPS start and end " msg += "times." logger.warning(msg) continue if ifo not in newScienceSegs: msg = "No data frames were found corresponding to the science " msg += "segments for ifo %s" %(ifo) logger.error(msg) missingData = True if checkSegmentGaps == 'update_times': scienceSegs[ifo] = segments.segmentlist() continue missing = scienceSegs[ifo] - newScienceSegs[ifo] if abs(missing): msg = "From ifo %s we are missing frames covering:" %(ifo) msg += "\n%s" % "\n".join(map(str, missing)) missingData = True logger.error(msg) if checkSegmentGaps == 'update_times': # Remove missing time, so that we can carry on if desired logger.info("Updating science segments for ifo %s.", ifo) scienceSegs[ifo] = scienceSegs[ifo] - missing if checkSegmentGaps == 'raise_error' and missingData: raise ValueError("Workflow cannot find needed data, exiting.") logger.info("Done checking, any discrepancies are reported above.") elif checkSegmentGaps == 'no_test': pass else: errMsg = "checkSegmentGaps kwarg must take a value from 'no_test', " errMsg += "'warn', 'update_times' or 'raise_error'." raise ValueError(errMsg) # Do all of the frame files that were returned actually exist? if checkFramesExist in ['warn','update_times','raise_error']: logger.info("Verifying that all frames exist on disk.") missingFrSegs, missingFrames = \ get_missing_segs_from_frame_file_cache(datafindcaches) missingFlag = False for ifo in missingFrames.keys(): # If no data in the input then do nothing if not scienceSegs[ifo]: continue # If using a backup server, does the frame exist remotely? if using_backup_server: # WARNING: This will be slow, but hopefully it will not occur # for too many frames. This could be optimized if # it becomes necessary. new_list = [] for frame in missingFrames[ifo]: for dfout in datafindouts: dfout_pfns = list(dfout.pfns) dfout_urls = [a.url for a in dfout_pfns] if frame.url in dfout_urls: pfn = dfout_pfns[dfout_urls.index(frame.url)] dfout.removePFN(pfn) if len(dfout.pfns) == 0: new_list.append(frame) else: msg = "Frame %s not found locally. "\ %(frame.url,) msg += "Replacing with remote url(s) %s." \ %(str([a.url for a in dfout.pfns]),) logger.info(msg) break else: new_list.append(frame) missingFrames[ifo] = new_list if missingFrames[ifo]: msg = "From ifo %s we are missing the following frames:" %(ifo) msg +='\n'.join([a.url for a in missingFrames[ifo]]) missingFlag = True logger.error(msg) if checkFramesExist == 'update_times': # Remove missing times, so that we can carry on if desired logger.info("Updating science times for ifo %s.", ifo) scienceSegs[ifo] = scienceSegs[ifo] - missingFrSegs[ifo] if checkFramesExist == 'raise_error' and missingFlag: raise ValueError("Workflow cannot find all frames, exiting.") logger.info("Finished checking frames.") elif checkFramesExist == 'no_test': pass else: errMsg = "checkFramesExist kwarg must take a value from 'no_test', " errMsg += "'warn', 'update_times' or 'raise_error'." raise ValueError(errMsg) # Check if there are cases where frames exist, but no entry in the segment # summary table are present. if checkSegmentSummary in ['warn', 'raise_error']: logger.info("Checking the segment summary table against frames.") dfScienceSegs = get_science_segs_from_datafind_outs(datafindcaches) missingFlag = False # NOTE: Should this be overrideable in the config file? sci_seg_name = "SCIENCE" if seg_file is None: err_msg = "You must provide the science segments SegFile object " err_msg += "if using the datafind-check-segment-summary option." raise ValueError(err_msg) if seg_file.seg_summ_dict is None: err_msg = "The provided science segments SegFile object must " err_msg += "contain a valid segment_summary table if using the " err_msg += "datafind-check-segment-summary option." raise ValueError(err_msg) seg_summary_times = seg_file.seg_summ_dict for ifo in dfScienceSegs.keys(): curr_seg_summ_times = seg_summary_times[ifo + ":" + sci_seg_name] missing = (dfScienceSegs[ifo] & seg_file.valid_segments) missing.coalesce() missing = missing - curr_seg_summ_times missing.coalesce() scienceButNotFrame = scienceSegs[ifo] - dfScienceSegs[ifo] scienceButNotFrame.coalesce() missing2 = scienceSegs[ifo] - scienceButNotFrame missing2.coalesce() missing2 = missing2 - curr_seg_summ_times missing2.coalesce() if abs(missing): msg = "From ifo %s the following times have frames, " %(ifo) msg += "but are not covered in the segment summary table." msg += "\n%s" % "\n".join(map(str, missing)) logger.error(msg) missingFlag = True if abs(missing2): msg = "From ifo %s the following times have frames, " %(ifo) msg += "are science, and are not covered in the segment " msg += "summary table." msg += "\n%s" % "\n".join(map(str, missing2)) logger.error(msg) missingFlag = True if checkSegmentSummary == 'raise_error' and missingFlag: errMsg = "Segment_summary discrepancy detected, exiting." raise ValueError(errMsg) elif checkSegmentSummary == 'no_test': pass else: errMsg = "checkSegmentSummary kwarg must take a value from 'no_test', " errMsg += "'warn', or 'raise_error'." raise ValueError(errMsg) # Now need to create the file for SCIENCE_AVAILABLE sci_avlble_dict = segments.segmentlistdict() # NOTE: Should this be overrideable in the config file? sci_avlble_name = "SCIENCE_AVAILABLE" for ifo in scienceSegs.keys(): sci_avlble_dict[ifo + ':' + sci_avlble_name] = scienceSegs[ifo] sci_avlble_file = SegFile.from_segment_list_dict('SCIENCE_AVAILABLE', sci_avlble_dict, ifo_list = scienceSegs.keys(), valid_segment=workflow.analysis_time, extension='.xml', tags=tags, directory=outputDir) logger.info("Leaving datafind module") if datafind_method == "AT_RUNTIME_FAKE_DATA": datafindouts = None else: datafindouts = FileList(datafindouts) return datafindouts, sci_avlble_file, scienceSegs, sci_avlble_name
[docs]def setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs, outputDir, tags=None): """ This function uses the `gwdatafind` library to obtain the location of all the frame files that will be needed to cover the analysis of the data given in scienceSegs. This function will not check if the returned frames cover the whole time requested, such sanity checks are done in the pycbc.workflow.setup_datafind_workflow entry function. As opposed to setup_datafind_runtime_single_call_perifo this call will one call to the datafind server for every science segment. This function will return a list of output files that correspond to the cache .lcf files that are produced, which list the locations of all frame files. This will cause problems with pegasus, which expects to know about all input files (ie. the frame files themselves.) Parameters ----------- cp : ConfigParser.ConfigParser instance This contains a representation of the information stored within the workflow configuration files scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances This contains the times that the workflow is expected to analyse. outputDir : path All output files written by datafind processes will be written to this directory. tags : list of strings, optional (default=None) Use this to specify tags. This can be used if this module is being called more than once to give call specific configuration (by setting options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This is also used to tag the Files returned by the class to uniqueify the Files and uniqueify the actual filename. FIXME: Filenames may not be unique with current codes! Returns -------- datafindcaches : list of glue.lal.Cache instances The glue.lal.Cache representations of the various calls to the datafind server and the returned frame files. datafindOuts : pycbc.workflow.core.FileList List of all the datafind output files for use later in the pipeline. """ if tags is None: tags = [] # Now ready to loop over the input segments datafindouts = [] datafindcaches = [] logger.info("Querying datafind server for all science segments.") for ifo, scienceSegsIfo in scienceSegs.items(): observatory = ifo[0].upper() frameType = cp.get_opt_tags("workflow-datafind", "datafind-%s-frame-type" % (ifo.lower()), tags) for seg in scienceSegsIfo: msg = "Finding data between %d and %d " %(seg[0],seg[1]) msg += "for ifo %s" %(ifo) logger.info(msg) # WARNING: For now the workflow will expect times to be in integer seconds startTime = int(seg[0]) endTime = int(seg[1]) # Sometimes the connection can drop, so try a backup here try: cache, cache_file = run_datafind_instance( cp, outputDir, observatory, frameType, startTime, endTime, ifo, tags=tags ) except: cache, cache_file = run_datafind_instance( cp, outputDir, observatory, frameType, startTime, endTime, ifo, tags=tags ) datafindouts.append(cache_file) datafindcaches.append(cache) return datafindcaches, datafindouts
[docs]def setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir, tags=None): """ This function uses the `gwdatafind` library to obtain the location of all the frame files that will be needed to cover the analysis of the data given in scienceSegs. This function will not check if the returned frames cover the whole time requested, such sanity checks are done in the pycbc.workflow.setup_datafind_workflow entry function. As opposed to setup_datafind_runtime_generated this call will only run one call to datafind per ifo, spanning the whole time. This function will return a list of output files that correspond to the cache .lcf files that are produced, which list the locations of all frame files. This will cause problems with pegasus, which expects to know about all input files (ie. the frame files themselves.) Parameters ----------- cp : ConfigParser.ConfigParser instance This contains a representation of the information stored within the workflow configuration files scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances This contains the times that the workflow is expected to analyse. outputDir : path All output files written by datafind processes will be written to this directory. tags : list of strings, optional (default=None) Use this to specify tags. This can be used if this module is being called more than once to give call specific configuration (by setting options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This is also used to tag the Files returned by the class to uniqueify the Files and uniqueify the actual filename. FIXME: Filenames may not be unique with current codes! Returns -------- datafindcaches : list of glue.lal.Cache instances The glue.lal.Cache representations of the various calls to the datafind server and the returned frame files. datafindOuts : pycbc.workflow.core.FileList List of all the datafind output files for use later in the pipeline. """ if tags is None: tags = [] # We want to ignore gaps as the detectors go up and down and calling this # way will give gaps. See the setup_datafind_runtime_generated function # for datafind calls that only query for data that will exist cp.set("datafind","on_gaps","ignore") # Now ready to loop over the input segments datafindouts = [] datafindcaches = [] logger.info("Querying datafind server for all science segments.") for ifo, scienceSegsIfo in scienceSegs.items(): observatory = ifo[0].upper() checked_times = segments.segmentlist([]) frame_types = cp.get_opt_tags( "workflow-datafind", "datafind-%s-frame-type" % (ifo.lower()), tags ) # Check if this is one type, or time varying frame_types = frame_types.replace(' ', '').strip().split(',') for ftype in frame_types: # Check the times, default to full time initially # This REQUIRES a coalesced segment list to work start = int(scienceSegsIfo[0][0]) end = int(scienceSegsIfo[-1][1]) # Then check for limits. We're expecting something like: # value[start:end], so need to extract value, start and end if '[' in ftype: # This gets start and end out bopt = ftype.split('[')[1].split(']')[0] newstart, newend = bopt.split(':') # Then check if the times are within science time start = max(int(newstart), start) end = min(int(newend), end) if end <= start: continue # This extracts value ftype = ftype.split('[')[0] curr_times = segments.segment(start, end) # The times here must be distinct. We cannot have two different # frame files at the same time from the same ifo. if checked_times.intersects_segment(curr_times): err_msg = "Different frame types cannot overlap in time." raise ValueError(err_msg) checked_times.append(curr_times) # Ask datafind where the frames are try: cache, cache_file = run_datafind_instance( cp, outputDir, observatory, ftype, start, end, ifo, tags=tags ) except: cache, cache_file = run_datafind_instance( cp, outputDir, observatory, ftype, start, end, ifo, tags=tags ) datafindouts.append(cache_file) datafindcaches.append(cache) return datafindcaches, datafindouts
[docs]def setup_datafind_runtime_frames_single_call_perifo(cp, scienceSegs, outputDir, tags=None): """ This function uses the `gwdatafind` library to obtain the location of all the frame files that will be needed to cover the analysis of the data given in scienceSegs. This function will not check if the returned frames cover the whole time requested, such sanity checks are done in the pycbc.workflow.setup_datafind_workflow entry function. As opposed to setup_datafind_runtime_generated this call will only run one call to datafind per ifo, spanning the whole time. This function will return a list of files corresponding to the individual frames returned by the datafind query. This will allow pegasus to more easily identify all the files used as input, but may cause problems for codes that need to take frame cache files as input. Parameters ----------- cp : ConfigParser.ConfigParser instance This contains a representation of the information stored within the workflow configuration files scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances This contains the times that the workflow is expected to analyse. outputDir : path All output files written by datafind processes will be written to this directory. tags : list of strings, optional (default=None) Use this to specify tags. This can be used if this module is being called more than once to give call specific configuration (by setting options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This is also used to tag the Files returned by the class to uniqueify the Files and uniqueify the actual filename. FIXME: Filenames may not be unique with current codes! Returns -------- datafindcaches : list of glue.lal.Cache instances The glue.lal.Cache representations of the various calls to the datafind server and the returned frame files. datafindOuts : pycbc.workflow.core.FileList List of all the datafind output files for use later in the pipeline. """ datafindcaches, _ = \ setup_datafind_runtime_cache_single_call_perifo(cp, scienceSegs, outputDir, tags=tags) datafindouts = convert_cachelist_to_filelist(datafindcaches) return datafindcaches, datafindouts
[docs]def setup_datafind_runtime_frames_multi_calls_perifo(cp, scienceSegs, outputDir, tags=None): """ This function uses the `gwdatafind` library to obtain the location of all the frame files that will be needed to cover the analysis of the data given in scienceSegs. This function will not check if the returned frames cover the whole time requested, such sanity checks are done in the pycbc.workflow.setup_datafind_workflow entry function. As opposed to setup_datafind_runtime_single_call_perifo this call will one call to the datafind server for every science segment. This function will return a list of files corresponding to the individual frames returned by the datafind query. This will allow pegasus to more easily identify all the files used as input, but may cause problems for codes that need to take frame cache files as input. Parameters ----------- cp : ConfigParser.ConfigParser instance This contains a representation of the information stored within the workflow configuration files scienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances This contains the times that the workflow is expected to analyse. outputDir : path All output files written by datafind processes will be written to this directory. tags : list of strings, optional (default=None) Use this to specify tags. This can be used if this module is being called more than once to give call specific configuration (by setting options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This is also used to tag the Files returned by the class to uniqueify the Files and uniqueify the actual filename. FIXME: Filenames may not be unique with current codes! Returns -------- datafindcaches : list of glue.lal.Cache instances The glue.lal.Cache representations of the various calls to the datafind server and the returned frame files. datafindOuts : pycbc.workflow.core.FileList List of all the datafind output files for use later in the pipeline. """ datafindcaches, _ = \ setup_datafind_runtime_cache_multi_calls_perifo(cp, scienceSegs, outputDir, tags=tags) datafindouts = convert_cachelist_to_filelist(datafindcaches) return datafindcaches, datafindouts
[docs]def setup_datafind_from_pregenerated_lcf_files(cp, ifos, outputDir, tags=None): """ This function is used if you want to run with pregenerated lcf frame cache files. Parameters ----------- cp : ConfigParser.ConfigParser instance This contains a representation of the information stored within the workflow configuration files ifos : list of ifo strings List of ifos to get pregenerated files for. outputDir : path All output files written by datafind processes will be written to this directory. Currently this sub-module writes no output. tags : list of strings, optional (default=None) Use this to specify tags. This can be used if this module is being called more than once to give call specific configuration (by setting options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This is also used to tag the Files returned by the class to uniqueify the Files and uniqueify the actual filename. Returns -------- datafindcaches : list of glue.lal.Cache instances The glue.lal.Cache representations of the various calls to the datafind server and the returned frame files. datafindOuts : pycbc.workflow.core.FileList List of all the datafind output files for use later in the pipeline. """ from glue import lal if tags is None: tags = [] datafindcaches = [] for ifo in ifos: search_string = "datafind-pregenerated-cache-file-%s" %(ifo.lower(),) frame_cache_file_name = cp.get_opt_tags("workflow-datafind", search_string, tags=tags) curr_cache = lal.Cache.fromfilenames([frame_cache_file_name], coltype=lal.LIGOTimeGPS) curr_cache.ifo = ifo datafindcaches.append(curr_cache) datafindouts = convert_cachelist_to_filelist(datafindcaches) return datafindcaches, datafindouts
[docs]def convert_cachelist_to_filelist(datafindcache_list): """ Take as input a list of glue.lal.Cache objects and return a pycbc FileList containing all frames within those caches. Parameters ----------- datafindcache_list : list of glue.lal.Cache objects The list of cache files to convert. Returns -------- datafind_filelist : FileList of frame File objects The list of frame files. """ prev_file = None prev_name = None this_name = None datafind_filelist = FileList([]) for cache in datafindcache_list: # sort the cache into time sequential order cache.sort() curr_ifo = cache.ifo for frame in cache: # Pegasus doesn't like "localhost" in URLs. frame.url = frame.url.replace('file://localhost', 'file://') # Not sure why it happens in OSDF URLs!! # May need to remove use of Cache objects frame.url = frame.url.replace('osdf://localhost/', 'osdf:///') # Create one File() object for each unique frame file that we # get back in the cache. if prev_file: prev_name = os.path.basename(prev_file.cache_entry.url) this_name = os.path.basename(frame.url) if (prev_file is None) or (prev_name != this_name): currFile = File(curr_ifo, frame.description, frame.segment, file_url=frame.url, use_tmp_subdirs=True) datafind_filelist.append(currFile) prev_file = currFile # Populate the PFNs for the File() we just created cvmfs_urls = ('file:///cvmfs/', 'osdf://') if frame.url.startswith(cvmfs_urls): # Frame is on CVMFS/OSDF, so let all sites read it directly. currFile.add_pfn(frame.url, site='all') elif frame.url.startswith('file://'): # Frame not on CVMFS, so may need transferring. # Be careful here! If all your frames files are on site # = local and you try to run on OSG, it will likely # overwhelm the condor file transfer process! currFile.add_pfn(frame.url, site='local') else: # Frame is at some unknown URL. Pegasus will decide how to deal # with this, but will likely transfer to local site first, and # from there transfer to remote sites as needed. currFile.add_pfn(frame.url, site='notlocal') return datafind_filelist
[docs]def get_science_segs_from_datafind_outs(datafindcaches): """ This function will calculate the science segments that are covered in the OutGroupList containing the frame files returned by various calls to the datafind server. This can then be used to check whether this list covers what it is expected to cover. Parameters ---------- datafindcaches : OutGroupList List of all the datafind output files. Returns -------- newScienceSegs : Dictionary of ifo keyed ligo.segments.segmentlist instances The times covered by the frames found in datafindOuts. """ newScienceSegs = {} for cache in datafindcaches: if len(cache) > 0: groupSegs = segments.segmentlist(e.segment for e in cache).coalesce() ifo = cache.ifo if ifo not in newScienceSegs: newScienceSegs[ifo] = groupSegs else: newScienceSegs[ifo].extend(groupSegs) newScienceSegs[ifo].coalesce() return newScienceSegs
[docs]def get_missing_segs_from_frame_file_cache(datafindcaches): """ This function will use os.path.isfile to determine if all the frame files returned by the local datafind server actually exist on the disk. This can then be used to update the science times if needed. Parameters ----------- datafindcaches : OutGroupList List of all the datafind output files. Returns -------- missingFrameSegs : Dict. of ifo keyed ligo.segments.segmentlist instances The times corresponding to missing frames found in datafindOuts. missingFrames: Dict. of ifo keyed lal.Cache instances The list of missing frames """ from glue import lal missingFrameSegs = {} missingFrames = {} for cache in datafindcaches: if len(cache) > 0: # Don't bother if these are not file:// urls, assume all urls in # one cache file must be the same type if not cache[0].scheme == 'file': warn_msg = "We have %s entries in the " %(cache[0].scheme,) warn_msg += "cache file. I do not check if these exist." logger.warning(warn_msg) continue _, currMissingFrames = cache.checkfilesexist(on_missing="warn") missingSegs = segments.segmentlist(e.segment \ for e in currMissingFrames).coalesce() ifo = cache.ifo if ifo not in missingFrameSegs: missingFrameSegs[ifo] = missingSegs missingFrames[ifo] = lal.Cache(currMissingFrames) else: missingFrameSegs[ifo].extend(missingSegs) # NOTE: This .coalesce probably isn't needed as the segments # should be disjoint. If speed becomes an issue maybe remove it? missingFrameSegs[ifo].coalesce() missingFrames[ifo].extend(currMissingFrames) return missingFrameSegs, missingFrames
[docs]def get_segment_summary_times(scienceFile, segmentName): """ This function will find the times for which the segment_summary is set for the flag given by segmentName. Parameters ----------- scienceFile : SegFile The segment file that we want to use to determine this. segmentName : string The DQ flag to search for times in the segment_summary table. Returns --------- summSegList : ligo.segments.segmentlist The times that are covered in the segment summary table. """ # Parse the segmentName segmentName = segmentName.split(':') if not len(segmentName) in [2, 3]: raise ValueError(f"Invalid channel name {segmentName}.") ifo = segmentName[0] channel = segmentName[1] version = '' if len(segmentName) == 3: version = int(segmentName[2]) # Load the filename xmldoc = utils.load_filename( scienceFile.cache_entry.path, compress='auto', contenthandler=LIGOLWContentHandler ) # Get the segment_def_id for the segmentName segmentDefTable = table.Table.get_table(xmldoc, "segment_definer") for entry in segmentDefTable: if (entry.ifos == ifo) and (entry.name == channel): if len(segmentName) == 2 or (entry.version==version): segDefID = entry.segment_def_id break else: raise ValueError("Cannot find channel %s in segment_definer table."\ %(segmentName)) # Get the segmentlist corresponding to this segmentName in segment_summary segmentSummTable = table.Table.get_table(xmldoc, "segment_summary") summSegList = segments.segmentlist([]) for entry in segmentSummTable: if entry.segment_def_id == segDefID: segment = segments.segment(entry.start_time, entry.end_time) summSegList.append(segment) summSegList.coalesce() return summSegList
[docs]def run_datafind_instance(cp, outputDir, observatory, frameType, startTime, endTime, ifo, tags=None): """ This function will query the datafind server once to find frames between the specified times for the specified frame type and observatory. Parameters ---------- cp : ConfigParser instance Source for any kwargs that should be sent to the datafind module outputDir : Output cache files will be written here. We also write the commands for reproducing what is done in this function to this directory. observatory : string The observatory to query frames for. Ex. 'H', 'L' or 'V'. NB: not 'H1', 'L1', 'V1' which denote interferometers. frameType : string The frame type to query for. startTime : int Integer start time to query the datafind server for frames. endTime : int Integer end time to query the datafind server for frames. ifo : string The interferometer to use for naming output. Ex. 'H1', 'L1', 'V1'. Maybe this could be merged with the observatory string, but this could cause issues if running on old 'H2' and 'H1' data. tags : list of string, optional (default=None) Use this to specify tags. This can be used if this module is being called more than once to give call specific configuration (by setting options in [workflow-datafind-${TAG}] rather than [workflow-datafind]). This is also used to tag the Files returned by the class to uniqueify the Files and uniquify the actual filename. FIXME: Filenames may not be unique with current codes! Returns -------- dfCache : glue.lal.Cache instance The glue.lal.Cache representation of the call to the datafind server and the returned frame files. cacheFile : pycbc.workflow.core.File Cache file listing all of the datafind output files for use later in the pipeline. """ from glue import lal if tags is None: tags = [] # Determine if we should override the default datafind server if cp.has_option_tags("workflow-datafind", "datafind-ligo-datafind-server", tags): datafind_server = cp.get_opt_tags( "workflow-datafind", "datafind-ligo-datafind-server", tags ) else: datafind_server = None seg = segments.segment([startTime, endTime]) # Take the datafind kwargs from config (usually urltype=file is # given). dfKwargs = {} # By default ignore missing frames, this case is dealt with outside of here dfKwargs['on_gaps'] = 'ignore' if cp.has_section("datafind"): for item, value in cp.items("datafind"): dfKwargs[item] = value for tag in tags: if cp.has_section('datafind-%s' %(tag)): for item, value in cp.items("datafind-%s" %(tag)): dfKwargs[item] = value # It is useful to print the corresponding command to the logs # directory to check if this was expected. log_datafind_command(observatory, frameType, startTime, endTime, os.path.join(outputDir,'logs'), **dfKwargs) logger.debug("Asking datafind server for frames.") dfCache = lal.Cache.from_urls( find_frame_urls( observatory, frameType, startTime, endTime, host=datafind_server, **dfKwargs ), ) logger.debug("Frames returned") # workflow format output file cache_file = File(ifo, 'DATAFIND', seg, extension='lcf', directory=outputDir, tags=tags) cache_file.add_pfn(cache_file.cache_entry.path, site='local') dfCache.ifo = ifo # Dump output to file fP = open(cache_file.storage_path, "w") # FIXME: CANNOT use dfCache.tofile because it will print 815901601.00000 # as a gps time which is incompatible with the lal cache format # (and the C codes) which demand an integer. #dfCache.tofile(fP) for entry in dfCache: start = str(int(entry.segment[0])) duration = str(int(abs(entry.segment))) print("%s %s %s %s %s" \ % (entry.observatory, entry.description, start, duration, entry.url), file=fP) entry.segment = segments.segment(int(entry.segment[0]), int(entry.segment[1])) fP.close() return dfCache, cache_file
[docs]def log_datafind_command(observatory, frameType, startTime, endTime, outputDir, **dfKwargs): """ This command will print an equivalent gw_data_find command to disk that can be used to debug why the internal datafind module is not working. """ # FIXME: This does not accurately reproduce the call as assuming the # kwargs will be the same is wrong, so some things need to be converted # "properly" to the command line equivalent. gw_command = ['gw_data_find', '--observatory', observatory, '--type', frameType, '--gps-start-time', str(startTime), '--gps-end-time', str(endTime)] for name, value in dfKwargs.items(): if name == 'match': gw_command.append("--match") gw_command.append(str(value)) elif name == 'urltype': gw_command.append("--url-type") gw_command.append(str(value)) elif name == 'on_gaps': pass else: errMsg = "Unknown datafind kwarg given: %s. " %(name) errMsg+= "This argument is stripped in the logged .sh command." logger.warning(errMsg) fileName = "%s-%s-%d-%d.sh" \ %(observatory, frameType, startTime, endTime-startTime) filePath = os.path.join(outputDir, fileName) fP = open(filePath, 'w') fP.write(' '.join(gw_command)) fP.close()
[docs]def datafind_keep_unique_backups(backup_outs, orig_outs): """This function will take a list of backup datafind files, presumably obtained by querying a remote datafind server, e.g. CIT, and compares these against a list of original datafind files, presumably obtained by querying the local datafind server. Only the datafind files in the backup list that do not appear in the original list are returned. This allows us to use only files that are missing from the local cluster. Parameters ----------- backup_outs : FileList List of datafind files from the remote datafind server. orig_outs : FileList List of datafind files from the local datafind server. Returns -------- FileList List of datafind files in backup_outs and not in orig_outs. """ # NOTE: This function is not optimized and could be made considerably # quicker if speed becomes in issue. With 4s frame files this might # be slow, but for >1000s files I don't foresee any issue, so I keep # this simple. return_list = FileList([]) # We compare the LFNs to determine uniqueness # Is there a way to associate two paths with one LFN?? orig_names = [f.name for f in orig_outs] for file in backup_outs: if file.name not in orig_names: return_list.append(file) else: index_num = orig_names.index(file.name) orig_out = orig_outs[index_num] pfns = list(file.pfns) # This shouldn't happen, but catch if it does assert(len(pfns) == 1) orig_out.add_pfn(pfns[0].url, site='notlocal') return return_list