# Copyright (C) 2020 Leo Singer, 2021 Tito Dal Canton
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Tools for dealing with LIGOLW XML files."""
import os
import sys
import numpy
from igwn_ligolw import lsctables
from igwn_ligolw import ligolw
from igwn_ligolw.ligolw import LIGOLWContentHandler as OrigLIGOLWContentHandler
from igwn_ligolw.lsctables import TableByName
from igwn_ligolw.types import FormatFunc, FromPyType, ToPyType
import pycbc.version as pycbc_version
__all__ = (
'default_null_value',
'return_empty_sngl',
'return_search_summary',
'create_process_table',
'legacy_row_id_converter',
'get_table_columns',
'LIGOLWContentHandler'
)
ROWID_PYTYPE = int
ROWID_TYPE = FromPyType[ROWID_PYTYPE]
ROWID_FORMATFUNC = FormatFunc[ROWID_TYPE]
IDTypes = set([u"ilwd:char", u"ilwd:char_u"])
[docs]
def default_null_value(col_name, col_type):
"""
Associate a sensible "null" default value to a given LIGOLW column type.
"""
if col_type in ['real_4', 'real_8']:
return 0.
if col_type in ['int_4s', 'int_8s']:
# this case includes row IDs
return 0
if col_type == 'lstring':
return ''
raise NotImplementedError(('Do not know how to initialize column '
'{} of type {}').format(col_name, col_type))
[docs]
def return_empty_sngl(nones=False):
"""
Function to create a SnglInspiral object where all columns are populated
but all are set to values that test False (ie. strings to '', floats/ints
to 0, ...). This avoids errors when you try to create a table containing
columns you don't care about, but which still need populating. NOTE: This
will also produce a process_id and event_id with 0 values. For most
applications these should be set to their correct values.
Parameters
----------
nones : bool (False)
If True, just set all columns to None.
Returns
--------
lsctables.SnglInspiral
The "empty" SnglInspiral object.
"""
sngl = lsctables.SnglInspiral()
cols = lsctables.SnglInspiralTable.validcolumns
for entry in cols:
col_name = ligolw.Column.ColumnName(entry)
value = None if nones else default_null_value(col_name, cols[entry])
setattr(sngl, col_name, value)
return sngl
[docs]
def return_search_summary(start_time=0, end_time=0, nevents=0, ifos=None):
"""
Function to create a SearchSummary object where all columns are populated
but all are set to values that test False (ie. strings to '', floats/ints
to 0, ...). This avoids errors when you try to create a table containing
columns you don't care about, but which still need populating. NOTE: This
will also produce a process_id with 0 values. For most applications these
should be set to their correct values.
It then populates columns if given them as options.
Returns
--------
lsctables.SeachSummary
The "empty" SearchSummary object.
"""
if ifos is None:
ifos = []
# create an empty search summary
search_summary = lsctables.SearchSummary()
cols = lsctables.SearchSummaryTable.validcolumns
for entry in cols:
col_name = ligolw.Column.ColumnName(entry)
value = default_null_value(col_name, cols[entry])
setattr(search_summary, col_name, value)
# fill in columns
if ifos:
search_summary.instruments = ifos
if nevents:
search_summary.nevents = nevents
if start_time and end_time:
search_summary.in_start_time = int(start_time)
search_summary.in_start_time_ns = int(start_time % 1 * 1e9)
search_summary.in_end_time = int(end_time)
search_summary.in_end_time_ns = int(end_time % 1 * 1e9)
search_summary.out_start_time = int(start_time)
search_summary.out_start_time_ns = int(start_time % 1 * 1e9)
search_summary.out_end_time = int(end_time)
search_summary.out_end_time_ns = int(end_time % 1 * 1e9)
return search_summary
[docs]
def create_process_table(document, program_name=None, detectors=None,
comment=None, options=None):
"""Create a LIGOLW process table with sane defaults, add it to a LIGOLW
document, and return it.
"""
if program_name is None:
program_name = os.path.basename(sys.argv[0])
if options is None:
options = {}
# igwn_ligolw does not like `cvs_entry_time` being an empty string
cvs_entry_time = pycbc_version.date or None
opts = options.copy()
key_del = []
for key, value in opts.items():
if type(value) not in tuple(FromPyType.keys()):
key_del.append(key)
if len(key_del) != 0:
for key in key_del:
opts.pop(key)
process = document.register_process(
program_name,
opts,
version=pycbc_version.version,
cvs_repository='pycbc/'+pycbc_version.git_branch,
cvs_entry_time=cvs_entry_time,
instruments=detectors,
comment=comment
)
return process
[docs]
def legacy_row_id_converter(ContentHandler):
"""Convert from old-style to new-style row IDs on the fly.
This is loosely adapted from :func:`ligo.lw.utils.ilwd.strip_ilwdchar`.
Notes
-----
When building a ContentHandler, this must be the _outermost_ decorator,
outside of :func:`ligo.lw.lsctables.use_in`, :func:`ligo.lw.param.use_in`,
or :func:`ligo.lw.table.use_in`.
"""
def endElementNS(self, uri_localname, qname,
__orig_endElementNS=ContentHandler.endElementNS):
"""Convert values of <Param> elements from ilwdchar to int."""
if isinstance(self.current, ligolw.Param) and self.current.Type in IDTypes:
old_type = ToPyType[self.current.Type]
old_val = str(old_type(self.current.pcdata))
new_value = ROWID_PYTYPE(old_val.split(":")[-1])
self.current.Type = ROWID_TYPE
self.current.pcdata = ROWID_FORMATFUNC(new_value)
__orig_endElementNS(self, uri_localname, qname)
remapped = {}
def startColumn(self, parent, attrs,
__orig_startColumn=ContentHandler.startColumn):
"""Convert types in <Column> elements from ilwdchar to int.
Notes
-----
This method is adapted from
:func:`ligo.lw.utils.ilwd.strip_ilwdchar`.
"""
result = __orig_startColumn(self, parent, attrs)
# If this is an ilwdchar column, then create a function to convert its
# rows' values for use in the startStream method below.
if result.Type in IDTypes:
old_type = ToPyType[result.Type]
def converter(old_value):
return ROWID_PYTYPE(str(old_type(old_value)).split(":")[-1])
remapped[(id(parent), result.Name)] = converter
result.Type = ROWID_TYPE
# If this is an ilwdchar column, then normalize the column name.
if parent.Name in TableByName:
validcolumns = TableByName[parent.Name].validcolumns
if result.Name not in validcolumns:
stripped_column_to_valid_column = {
ligolw.Column.ColumnName(name): name
for name in validcolumns
}
if result.Name in stripped_column_to_valid_column:
result.setAttribute(
'Name', stripped_column_to_valid_column[result.Name])
return result
def startStream(self, parent, attrs,
__orig_startStream=ContentHandler.startStream):
"""Convert values in table <Stream> elements from ilwdchar to int.
Notes
-----
This method is adapted from
:meth:`ligo.lw.table.TableStream.config`.
"""
result = __orig_startStream(self, parent, attrs)
if isinstance(result, ligolw.Table.Stream):
loadcolumns = set(parent.columnnames)
if parent.loadcolumns is not None:
# FIXME: convert loadcolumns attributes to sets to
# avoid the conversion.
loadcolumns &= set(parent.loadcolumns)
result._tokenizer.set_types([
(remapped.pop((id(parent), colname), pytype)
if colname in loadcolumns else None)
for pytype, colname
in zip(parent.columnpytypes, parent.columnnames)])
return result
ContentHandler.endElementNS = endElementNS
ContentHandler.startColumn = startColumn
ContentHandler.startStream = startStream
return ContentHandler
def _build_series(series, dim_names, comment, delta_name, delta_unit):
Attributes = ligolw.sax.xmlreader.AttributesImpl
elem = ligolw.LIGO_LW(
Attributes({'Name': str(series.__class__.__name__)}))
if comment is not None:
elem.appendChild(ligolw.Comment()).pcdata = comment
elem.appendChild(ligolw.Time.from_gps(series.epoch, 'epoch'))
elem.appendChild(ligolw.Param.from_pyvalue('f0', series.f0, unit='s^-1'))
delta = getattr(series, delta_name)
if numpy.iscomplexobj(series.data.data):
data = numpy.row_stack((
numpy.arange(len(series.data.data)) * delta,
series.data.data.real,
series.data.data.imag
))
else:
data = numpy.row_stack((
numpy.arange(len(series.data.data)) * delta,
series.data.data
))
a = ligolw.Array.build(series.name, data, dim_names=dim_names)
a.Unit = str(series.sampleUnits)
dim0 = a.getElementsByTagName(ligolw.Dim.tagName)[0]
dim0.Unit = delta_unit
dim0.Start = series.f0
dim0.Scale = delta
elem.appendChild(a)
return elem
def make_psd_xmldoc(psddict, xmldoc=None):
"""Add a set of PSDs to a LIGOLW XML document. If the document is not
given, a new one is created first.
"""
xmldoc = ligolw.Document() if xmldoc is None else xmldoc.childNodes[0]
# the PSDs must be children of a LIGO_LW with name "psd"
root_name = 'psd'
Attributes = ligolw.sax.xmlreader.AttributesImpl
lw = xmldoc.appendChild(
ligolw.LIGO_LW(Attributes({'Name': root_name})))
for instrument, psd in psddict.items():
xmlseries = _build_series(
psd,
('Frequency,Real', 'Frequency'),
None,
'deltaF',
's^-1'
)
fs = lw.appendChild(xmlseries)
fs.appendChild(ligolw.Param.from_pyvalue('instrument', instrument))
return xmldoc
def snr_series_to_xml(snr_series, document, sngl_inspiral_id):
"""Save an SNR time series into an XML document, in a format compatible
with BAYESTAR.
"""
snr_lal = snr_series.lal()
snr_lal.name = 'snr'
snr_lal.sampleUnits = ''
snr_xml = _build_series(
snr_lal,
('Time', 'Time,Real,Imaginary'),
None,
'deltaT',
's'
)
snr_node = document.childNodes[-1].appendChild(snr_xml)
eid_param = ligolw.Param.from_pyvalue('event_id', sngl_inspiral_id)
snr_node.appendChild(eid_param)
[docs]
def get_table_columns(table):
"""Return a list of columns that are present in the given table, in a
format that can be passed to `igwn_ligolw.Table.new()`.
The split on ":" is needed for columns like `process:process_id`, which
must be listed as `process:process_id` in `igwn_ligolw.Table.new()`, but are
listed as just `process_id` in the `columnnames` attribute of the given
table.
"""
columns = []
for col in table.validcolumns:
att = col.split(':')[-1]
if att in table.columnnames:
columns.append(col)
return columns
[docs]
@legacy_row_id_converter
class LIGOLWContentHandler(OrigLIGOLWContentHandler):
"Dummy class needed for loading LIGOLW files"