# Copyright (C) 2017 Alex Nitz
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
""" This package provides information about LIGO/Virgo detections of
compact binary mergers
"""
import logging
import numpy
logger = logging.getLogger('pycbc.catalog')
_aliases = {}
_aliases['mchirp'] = 'chirp_mass_source'
_aliases['mass1'] = 'mass_1_source'
_aliases['mass2'] = 'mass_2_source'
_aliases['snr'] = 'network_matched_filter_snr'
_aliases['z'] = _aliases['redshift'] = 'redshift'
_aliases['distance'] = 'luminosity_distance'
[docs]
def find_event_in_catalog(name, source=None):
""" Get event data from a catalog
Parameters
----------
name: str
Name of event to retrieve from catalog
source: str, None
If not provided, look through each catalog until event is found.
If provided, only check that specific catalog.
"""
from .catalog import list_catalogs, get_source
if source is None:
catalogs = list_catalogs()
else:
catalogs = [source]
for catalog in catalogs:
try:
return get_source(catalog)[name]
except KeyError:
# Try common name
data = get_source(catalog)
for mname in data:
cname = data[mname]['commonName']
if cname.upper() == name.upper():
name = mname
data = data[name]
return data
else:
raise ValueError(f'Did not find merger matching name: {name}')
[docs]
class Merger(object):
"""Informaton about a specific compact binary merger"""
def __init__(self, name, source=None):
""" Return the information of a merger
Parameters
----------
name: str
The name (GW prefixed date) of the merger event.
"""
self.data = find_event_in_catalog(name, source=source)
# Set some basic params from the dataset
for key in self.data:
setattr(self, '_raw_' + key, self.data[key])
for key in _aliases:
setattr(self, key, self.data[_aliases[key]])
self.common_name = self.data['commonName']
self.time = self.data['GPS']
self.frame = 'source'
[docs]
def strain(self, ifo, duration=32, sample_rate=4096):
""" Return strain around the event
Currently this will return the strain around the event in the smallest
format available. Selection of other data is not yet available.
Parameters
----------
ifo: str
The name of the observatory you want strain for. Ex. H1, L1, V1
Returns
-------
strain: pycbc.types.TimeSeries
Strain around the event.
"""
from pycbc.io import get_file
from pycbc.frame import read_frame
for fdict in self.data['strain']:
if (fdict['detector'] == ifo and fdict['duration'] == duration and
fdict['sampling_rate'] == sample_rate and
fdict['format'] == 'gwf'):
url = fdict['url']
break
else:
raise ValueError('no strain data is available as requested '
'for ' + self.common_name)
ver = url.split('/')[-1].split('-')[1].split('_')[-1]
sampling_map = {4096: "4KHZ",
16384: "16KHZ"}
channel = "{}:GWOSC-{}_{}_STRAIN".format(
ifo, sampling_map[sample_rate], ver)
filename = get_file(url, cache=True)
return read_frame(str(filename), str(channel))
[docs]
class Catalog(object):
"""Manage a set of binary mergers"""
def __init__(self, source='gwtc-3'):
""" Return the set of detected mergers
The set of detected mergers. At some point this may have some selection
abilities.
"""
from . import catalog
self.data = catalog.get_source(source=source)
self.mergers = {name: Merger(name,
source=source) for name in self.data}
self.names = self.mergers.keys()
def __len__(self):
return len(self.mergers)
def __getitem__(self, key):
try:
return self.mergers[key]
except KeyError:
# Try common name
for m in self.mergers:
if key == self.mergers[m].common_name:
break
else:
raise ValueError('Did not find merger matching'
' name: {}'.format(key))
return self.mergers[m]
def __setitem__(self, key, value):
self.mergers[key] = value
def __delitem__(self, key):
del self.mergers[key]
def __iter__(self):
return iter(self.mergers)