# Copyright (C) 2020 Collin Capano and Shilpa Kastha
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""This module provides model classes that assume the noise is Gaussian and
introduces a gate to remove given times from the data, using the inpainting
method to fill the removed part such that it does not enter the likelihood.
"""
from abc import abstractmethod
import logging
import numpy
import scipy
from scipy import special
from pycbc.types import FrequencySeries
from pycbc.detector import Detector
from pycbc.pnutils import hybrid_meco_frequency
from pycbc import types
from pycbc.waveform.utils import time_from_frequencyseries
from pycbc.waveform import generator
from pycbc.filter import highpass
from .gaussian_noise import (BaseGaussianNoise, create_waveform_generator,
catch_waveform_error)
from .base_data import BaseDataModel
from .data_utils import fd_data_from_strain_dict
[docs]
class BaseGatedGaussian(BaseGaussianNoise):
r"""Base model for gated gaussian.
Provides additional routines for applying a time-domain gate to data.
See :py:class:`GatedGaussianNoise` for more details.
"""
def __init__(self, variable_params, data, low_frequency_cutoff, psds=None,
high_frequency_cutoff=None, normalize=False,
static_params=None, highpass_waveforms=False,
**kwargs):
# we'll want the time-domain data, so store that
self._td_data = {}
# cache the overwhitened data
self._overwhitened_data = {}
# cache the current gated data
self._gated_data = {}
# cache terms related to normalization and gating
self._invasds = {}
self._Rss = {}
self._lognorm = {}
self._gatetimes = {}
self._det_lognls = {}
# cache samples and linear regression for determinant extrapolation
self._cov_samples = {}
self._cov_regressions = {}
# highpass waveforms with the given frequency
self.highpass_waveforms = highpass_waveforms
if self.highpass_waveforms:
logging.info("Will highpass waveforms at %f Hz",
highpass_waveforms)
# set up the boiler-plate attributes
super().__init__(
variable_params, data, low_frequency_cutoff, psds=psds,
high_frequency_cutoff=high_frequency_cutoff, normalize=normalize,
static_params=static_params, **kwargs)
[docs]
@classmethod
def from_config(cls, cp, data_section='data', data=None, psds=None,
**kwargs):
"""Adds addiotional keyword arguments based on config file.
Additional keyword arguments are:
* ``highpass_waveforms`` : waveforms will be highpassed.
"""
if cp.has_option(data_section, 'strain-high-pass') and \
'highpass_waveforms' not in kwargs:
kwargs['highpass_waveforms'] = float(cp.get(data_section,
'strain-high-pass'))
return super().from_config(cp, data_section=data_section,
data=data, psds=psds,
**kwargs)
@BaseDataModel.data.setter
def data(self, data):
"""Store a copy of the FD and TD data."""
BaseDataModel.data.fset(self, data)
# store the td version
self._td_data = {det: d.to_timeseries() for det, d in data.items()}
@property
def td_data(self):
"""The data in the time domain."""
return self._td_data
@BaseGaussianNoise.psds.setter
def psds(self, psds):
"""Sets the psds, and calculates the weight and norm from them.
The data and the low and high frequency cutoffs must be set first.
"""
# check that the data has been set
if self._data is None:
raise ValueError("No data set")
if self._f_lower is None:
raise ValueError("low frequency cutoff not set")
if self._f_upper is None:
raise ValueError("high frequency cutoff not set")
# make sure the relevant caches are cleared
self._psds.clear()
self._invpsds.clear()
self._invasds.clear()
self._gated_data.clear()
# store the psds
for det, d in self._data.items():
if psds is None:
# No psd means assume white PSD
p = FrequencySeries(numpy.ones(int(self._N/2+1)),
delta_f=d.delta_f)
else:
# copy for storage
p = psds[det].copy()
self._psds[det] = p
# we'll store the weight to apply to the inner product
invp = 1./p
self._invpsds[det] = invp
self._invasds[det] = invp**0.5
# store the autocorrelation function and covariance matrix for
# each detector
Rss = p.astype(types.complex_same_precision_as(p)).to_timeseries()
self._Rss[det] = Rss
# calculate and store the linear regressions to extrapolate
# determinant values
if self.normalize:
self._set_covfit(det)
self._overwhitened_data = self.whiten(self.data, 2, inplace=False)
def _set_covfit(self, det):
"""Sets the fit function for estimating the covariance determinant.
This must be called after the PSDs have been set, otherwise a
ValueError will be raised.
"""
try:
p = self.psds[det]
except KeyError:
raise ValueError("No psd set for detector %s" % det)
Rss = self._Rss[det]
cov = scipy.linalg.toeplitz(Rss/2) # full covariance matrix
samples, fit = self.logdet_fit(cov, p)
self._cov_samples[det] = samples
self._cov_regressions[det] = fit
return
[docs]
def logdet_fit(self, cov, p):
"""Construct a linear regression from a sample of truncated covariance
matrices.
Returns the sample points used for linear fit generation as well as the
linear fit parameters.
"""
# initialize lists for matrix sizes and determinants
sample_sizes = []
sample_dets = []
# set sizes of sample matrices; ensure exact calculations are only on
# small matrices
s = cov.shape[0]
max_size = 8192
if s > max_size:
sample_sizes = [s, max_size, max_size//2, max_size//4]
else:
sample_sizes = [s, s//2, s//4, s//8]
for i in sample_sizes:
# calculate logdet of the full matrix using circulant eigenvalue
# approximation
if i == s:
ld = 2*numpy.log(p/(2*p.delta_t)).sum()
sample_dets.append(ld)
# generate three more sample matrices using exact calculations
else:
gate_size = s - i
start = (s - gate_size)//2
end = start + gate_size
tc = numpy.delete(numpy.delete(cov, slice(start, end), 0),
slice(start, end), 1)
ld = numpy.linalg.slogdet(tc)[1]
sample_dets.append(ld)
# generate a linear regression using the four points (size, logdet)
x = numpy.vstack([sample_sizes, numpy.ones(len(sample_sizes))]).T
m, b = numpy.linalg.lstsq(x, sample_dets, rcond=None)[0]
return (sample_sizes, sample_dets), (m, b)
@BaseGaussianNoise.normalize.setter
def normalize(self, normalize):
"""Clears the current stats if the normalization state is changed.
If normalize is set to True, the fit to the covariance determinant
will be calculated if it hasn't yet and PSDs are set.
"""
# call the parent setter to clear the current stats and set normalize
BaseGaussianNoise.normalize.fset(self, normalize)
# now set the covariance determinant fit if needed
if normalize:
for det in self._psds:
if det not in self._cov_regressions:
# set the covariance determinant fit
self._set_covfit(det)
[docs]
def gate_indices(self, det):
"""Calculate the indices corresponding to start and end of gate.
"""
# get time series start and delta_t
ts = self.td_data[det]
# get gate start and length from get_gate_times
gate_start, gate_length = self.get_gate_times()[det]
# the gate code takes central time and width
window = gate_length / 2
gt = gate_start + window
lindex, rindex = ts.get_gate_indices(gt, window)
return lindex, rindex
[docs]
def det_lognorm(self, det, start_index=None, end_index=None):
"""Calculate the normalization term from the truncated covariance
matrix.
Determinant is estimated using a linear fit to logdet vs truncated
matrix size.
"""
if not self.normalize:
return 0
try:
# check if the key already exists; if so, return its value
lognorm = self._lognorm[(det, start_index, end_index)]
except KeyError:
# get the size of the matrix
n = len(self._Rss[det])
trunc_size = n - (end_index - start_index)
# call the linear regression
m, b = self._cov_regressions[det]
# extrapolate from linear fit
ld = m*trunc_size + b
# full normalization term:
lognorm = -0.5*(numpy.log(2*numpy.pi)*trunc_size + ld)
# cache the result
self._lognorm[(det, start_index, end_index)] = lognorm
return lognorm
def _nowaveform_handler(self):
"""Convenience function to set logl values if no waveform generated.
"""
return -numpy.inf
def _loglr(self):
r"""Computes the log likelihood ratio.
Returns
-------
float
The value of the log likelihood ratio evaluated at the given point.
"""
return self._loglikelihood() - self._lognl()
[docs]
def whiten(self, data, whiten, inplace=False):
"""Whitens the given data.
Parameters
----------
data : dict
Dictionary of detector names -> FrequencySeries.
whiten : {0, 1, 2}
Integer indicating what level of whitening to apply. Levels are:
0: no whitening; 1: whiten; 2: overwhiten.
inplace : bool, optional
If True, modify the data in place. Otherwise, a copy will be
created for whitening.
Returns
-------
dict :
Dictionary of FrequencySeries after the requested whitening has
been applied.
"""
if not inplace:
data = {det: d.copy() for det, d in data.items()}
if whiten:
for det, dtilde in data.items():
invpsd = self._invpsds[det]
if whiten == 1:
dtilde *= invpsd**0.5
elif whiten == 2:
dtilde *= invpsd
else:
raise ValueError("whiten must be either 0, 1, or 2")
return data
[docs]
def get_data(self):
"""Return a copy of the data.
Returns
-------
dict :
Dictionary of detector names -> FrequencySeries.
"""
return {det: d.copy() for det, d in self.data.items()}
[docs]
def get_gated_data(self):
"""Return a copy of the gated data.
The gated data will be cached for faster retrieval.
Returns
-------
dict :
Dictionary of detector names -> FrequencySeries.
"""
gate_times = self.get_gate_times()
out = {}
for det, d in self.td_data.items():
# make sure the cache at least has the detectors in it
try:
cache = self._gated_data[det]
except KeyError:
cache = self._gated_data[det] = {}
invpsd = self._invpsds[det]
gatestartdelay, dgatedelay = gate_times[det]
try:
dtilde = cache[gatestartdelay, dgatedelay]
except KeyError:
# doesn't exist yet, or the gate times changed
cache.clear()
d = d.gate(gatestartdelay + dgatedelay/2,
window=dgatedelay/2, copy=True,
invpsd=invpsd, method='paint')
dtilde = d.to_frequencyseries()
# save for next time
cache[gatestartdelay, dgatedelay] = dtilde
out[det] = dtilde
return out
[docs]
def get_gate_times(self):
"""Gets the time to apply a gate based on the current sky position.
If the parameter ``gatefunc`` is set to ``'hmeco'``, the gate times
will be calculated based on the hybrid MECO of the given set of
parameters; see ``get_gate_times_hmeco`` for details. Otherwise, the
gate times will just be retrieved from the ``t_gate_start`` and
``t_gate_end`` parameters.
Returns
-------
dict :
Dictionary of detector names -> (gate start, gate width)
"""
params = self.current_params
try:
gatefunc = self.current_params['gatefunc']
except KeyError:
gatefunc = None
if gatefunc == 'hmeco':
return self.get_gate_times_hmeco()
gatestart = params['t_gate_start']
gateend = params['t_gate_end']
# we'll need the sky location for determining time shifts
ra = self.current_params['ra']
dec = self.current_params['dec']
# try to get from cache
try:
gatetimes = self._gatetimes[gatestart, gateend, ra, dec]
except KeyError:
# doesn't exist, or parameters have changed, recalculate
self._gatetimes.clear()
gatetimes = self._get_gate_times(gatestart, gateend, ra, dec)
self._gatetimes[gatestart, gateend, ra, dec] = gatetimes
return gatetimes
def _get_gate_times(self, gatestart, gateend, ra, dec):
"""Calculates the gate times in each detector.
Parameters
----------
gatestart : float
Geocentric start time of the gate.
gateend : float
Geocentric end time of the gate.
ra : float
Right ascension of the signal.
dec : float
Declination of the signal.
Returns
-------
dict :
Dictionary of detector names -> (gate start, gate width)
"""
gatetimes = {}
for det in self._invpsds:
thisdet = Detector(det)
# account for the time delay between the waveforms of the
# different detectors
gatestartdelay = gatestart + thisdet.time_delay_from_earth_center(
ra, dec, gatestart)
gateenddelay = gateend + thisdet.time_delay_from_earth_center(
ra, dec, gateend)
dgatedelay = gateenddelay - gatestartdelay
gatetimes[det] = (gatestartdelay, dgatedelay)
return gatetimes
[docs]
def get_gate_times_hmeco(self):
"""Gets the time to apply a gate based on the current sky position.
Returns
-------
dict :
Dictionary of detector names -> (gate start, gate width)
"""
# generate the template waveform
wfs = self.get_waveforms()
# get waveform parameters
params = self.current_params
spin1 = params['spin1z']
spin2 = params['spin2z']
# gate input for ringdown analysis which consideres a start time
# and an end time
dgate = params['gate_window']
meco_f = hybrid_meco_frequency(params['mass1'], params['mass2'], spin1,
spin2)
# figure out the gate times
gatetimes = {}
for det, h in wfs.items():
invpsd = self._invpsds[det]
h.resize(len(invpsd))
ht = h.to_timeseries()
f_low = int((self._f_lower[det]+1)/h.delta_f)
sample_freqs = h.sample_frequencies[f_low:].numpy()
f_idx = numpy.where(sample_freqs <= meco_f)[0][-1]
# find time corresponding to meco frequency
t_from_freq = time_from_frequencyseries(
h[f_low:], sample_frequencies=sample_freqs)
if t_from_freq[f_idx] > 0:
gatestartdelay = t_from_freq[f_idx] + float(t_from_freq.epoch)
else:
gatestartdelay = t_from_freq[f_idx] + ht.sample_times[-1]
gatestartdelay = min(gatestartdelay, params['t_gate_start'])
gatetimes[det] = (gatestartdelay, dgate)
return gatetimes
def _lognl(self):
"""Calculates the log of the noise likelihood.
"""
# clear variables
lognl = 0.
self._det_lognls.clear()
# get the times of the gates
gate_times = self.get_gate_times()
for det, invpsd in self._invpsds.items():
start_index, end_index = self.gate_indices(det)
# linear estimation
norm = self.det_lognorm(det, start_index, end_index)
gatestartdelay, dgatedelay = gate_times[det]
# we always filter the entire segment starting from kmin, since the
# gated series may have high frequency components
slc = slice(self._kmin[det], self._kmax[det])
# gate the data
data = self.td_data[det]
gated_dt = data.gate(gatestartdelay + dgatedelay/2,
window=dgatedelay/2, copy=True,
invpsd=invpsd, method='paint')
# convert to the frequency series
gated_d = gated_dt.to_frequencyseries()
# overwhiten
gated_d *= invpsd
d = self.data[det]
# inner product
ip = 4 * invpsd.delta_f * d[slc].inner(gated_d[slc]).real # <d, d>
dd = norm - 0.5*ip
# store
self._det_lognls[det] = dd
lognl += dd
return float(lognl)
[docs]
def det_lognl(self, det):
# make sure lognl has been called
_ = self._trytoget('lognl', self._lognl)
# the det_lognls dict should have been updated, so can call it now
return self._det_lognls[det]
@staticmethod
def _fd_data_from_strain_dict(opts, strain_dict, psd_strain_dict):
"""Wrapper around :py:func:`data_utils.fd_data_from_strain_dict`.
Ensures that if the PSD is estimated from data, the inverse spectrum
truncation uses a Hann window, and that the low frequency cutoff is
zero.
"""
if opts.psd_inverse_length and opts.invpsd_trunc_method is None:
# make sure invpsd truncation is set to hanning
logging.info("Using Hann window to truncate inverse PSD")
opts.invpsd_trunc_method = 'hann'
lfs = None
if opts.psd_estimation:
# make sure low frequency cutoff is zero
logging.info("Setting low frequency cutoff of PSD to 0")
lfs = opts.low_frequency_cutoff.copy()
opts.low_frequency_cutoff = {d: 0. for d in lfs}
out = fd_data_from_strain_dict(opts, strain_dict, psd_strain_dict)
# set back
if lfs is not None:
opts.low_frequency_cutoff = lfs
return out
[docs]
class GatedGaussianNoise(BaseGatedGaussian):
r"""Model that applies a time domain gate, assuming stationary Gaussian
noise.
The gate start and end times are set by providing ``t_gate_start`` and
``t_gate_end`` parameters, respectively. This will cause the gated times
to be excised from the analysis. For more details on the likelihood
function and its derivation, see
`arXiv:2105.05238 <https://arxiv.org/abs/2105.05238>`_.
.. warning::
The normalization of the likelihood depends on the gate times. However,
at the moment, the normalization is not calculated, as it depends on
the determinant of the truncated covariance matrix (see Eq. 4 of
arXiv:2105.05238). For this reason it is recommended that you only
use this model for fixed gate times.
"""
name = 'gated_gaussian_noise'
def __init__(self, variable_params, data, low_frequency_cutoff, psds=None,
high_frequency_cutoff=None, normalize=False,
static_params=None, **kwargs):
# set up the boiler-plate attributes
super().__init__(
variable_params, data, low_frequency_cutoff, psds=psds,
high_frequency_cutoff=high_frequency_cutoff, normalize=normalize,
static_params=static_params, **kwargs)
# create the waveform generator
self.waveform_generator = create_waveform_generator(
self.variable_params, self.data,
waveform_transforms=self.waveform_transforms,
recalibration=self.recalibration,
gates=self.gates, **self.static_params)
@property
def _extra_stats(self):
"""No extra stats are stored."""
return []
@catch_waveform_error
def _loglikelihood(self):
r"""Computes the log likelihood after removing the power within the
given time window,
.. math::
\log p(d|\Theta) = -\frac{1}{2} \sum_i
\left< d_i - h_i(\Theta) | d_i - h_i(\Theta) \right>,
at the current parameter values :math:`\Theta`.
Returns
-------
float
The value of the log likelihood.
"""
# generate the template waveform
wfs = self.get_waveforms()
# get the times of the gates
gate_times = self.get_gate_times()
logl = 0.
for det, h in wfs.items():
invpsd = self._invpsds[det]
start_index, end_index = self.gate_indices(det)
norm = self.det_lognorm(det, start_index, end_index)
gatestartdelay, dgatedelay = gate_times[det]
# we always filter the entire segment starting from kmin, since the
# gated series may have high frequency components
slc = slice(self._kmin[det], self._kmax[det])
# calculate the residual
data = self.td_data[det]
ht = h.to_timeseries()
res = data - ht
rtilde = res.to_frequencyseries()
gated_res = res.gate(gatestartdelay + dgatedelay/2,
window=dgatedelay/2, copy=True,
invpsd=invpsd, method='paint')
gated_rtilde = gated_res.to_frequencyseries()
# overwhiten
gated_rtilde *= invpsd
rr = 4 * invpsd.delta_f * rtilde[slc].inner(gated_rtilde[slc]).real
logl += norm - 0.5*rr
return float(logl)
@property
def multi_signal_support(self):
""" The list of classes that this model supports in a multi-signal
likelihood
"""
return [type(self)]
[docs]
def multi_loglikelihood(self, models):
""" Calculate a multi-model (signal) likelihood
"""
# Generate the waveforms for each submodel
wfs = []
for m in models + [self]:
wf = m.get_waveforms()
wfs.append(wf)
# combine into a single waveform
combine = {}
for det in self.data:
# get max waveform length
mlen = max([len(x[det]) for x in wfs])
[x[det].copy().resize(mlen) for x in wfs]
combine[det] = sum([x[det] for x in wfs])
self._current_wfs = combine
return self._loglikelihood()
[docs]
class GatedGaussianMargPol(BaseGatedGaussian):
r"""Gated gaussian model with numerical marginalization over polarization.
This implements the GatedGaussian likelihood with an explicit numerical
marginalization over polarization angle. This is accomplished using
a fixed set of integration points distribution uniformation between
0 and 2pi. By default, 1000 integration points are used.
The 'polarization_samples' argument can be passed to set an alternate
number of integration points.
"""
name = 'gated_gaussian_margpol'
def __init__(self, variable_params, data, low_frequency_cutoff, psds=None,
high_frequency_cutoff=None, normalize=False,
static_params=None,
polarization_samples=1000, **kwargs):
# set up the boiler-plate attributes
super().__init__(
variable_params, data, low_frequency_cutoff, psds=psds,
high_frequency_cutoff=high_frequency_cutoff, normalize=normalize,
static_params=static_params, **kwargs)
# the polarization parameters
self.polarization_samples = polarization_samples
self.pol = numpy.linspace(0, 2*numpy.pi, self.polarization_samples)
self.dets = {}
# create the waveform generator
self.waveform_generator = create_waveform_generator(
self.variable_params, self.data,
waveform_transforms=self.waveform_transforms,
recalibration=self.recalibration,
generator_class=generator.FDomainDetFrameTwoPolGenerator,
**self.static_params)
[docs]
def get_gate_times_hmeco(self):
"""Gets the time to apply a gate based on the current sky position.
Returns
-------
dict :
Dictionary of detector names -> (gate start, gate width)
"""
# generate the template waveform
wfs = self.get_waveforms()
# get waveform parameters
params = self.current_params
spin1 = params['spin1z']
spin2 = params['spin2z']
# gate input for ringdown analysis which consideres a start time
# and an end time
dgate = params['gate_window']
meco_f = hybrid_meco_frequency(params['mass1'], params['mass2'], spin1,
spin2)
# figure out the gate times
gatetimes = {}
# for now only calculating time from plus polarization; should be all
# that's necessary
for det, (hp, hc) in wfs.items():
invpsd = self._invpsds[det]
hp.resize(len(invpsd))
ht = hp.to_timeseries()
f_low = int((self._f_lower[det]+1)/hp.delta_f)
sample_freqs = hp.sample_frequencies[f_low:].numpy()
f_idx = numpy.where(sample_freqs <= meco_f)[0][-1]
# find time corresponding to meco frequency
t_from_freq = time_from_frequencyseries(
hp[f_low:], sample_frequencies=sample_freqs)
if t_from_freq[f_idx] > 0:
gatestartdelay = t_from_freq[f_idx] + float(t_from_freq.epoch)
else:
gatestartdelay = t_from_freq[f_idx] + ht.sample_times[-1]
gatestartdelay = min(gatestartdelay, params['t_gate_start'])
gatetimes[det] = (gatestartdelay, dgate)
return gatetimes
@property
def _extra_stats(self):
"""Adds the maxL polarization and corresponding likelihood."""
return ['maxl_polarization', 'maxl_logl']
@catch_waveform_error
def _loglikelihood(self):
r"""Computes the log likelihood after removing the power within the
given time window,
.. math::
\log p(d|\Theta) = -\frac{1}{2} \sum_i
\left< d_i - h_i(\Theta) | d_i - h_i(\Theta) \right>,
at the current parameter values :math:`\Theta`.
Returns
-------
float
The value of the log likelihood.
"""
# generate the template waveform
wfs = self.get_waveforms()
# get the gated waveforms and data
gated_wfs = self.get_gated_waveforms()
gated_data = self.get_gated_data()
# cycle over
loglr = 0.
lognl = 0.
for det, (hp, hc) in wfs.items():
# get the antenna patterns
if det not in self.dets:
self.dets[det] = Detector(det)
fp, fc = self.dets[det].antenna_pattern(self.current_params['ra'],
self.current_params['dec'],
self.pol,
self.current_params['tc'])
start_index, end_index = self.gate_indices(det)
norm = self.det_lognorm(det, start_index, end_index)
# we always filter the entire segment starting from kmin, since the
# gated series may have high frequency components
slc = slice(self._kmin[det], self._kmax[det])
# get the gated values
gated_hp, gated_hc = gated_wfs[det]
gated_d = gated_data[det]
# we'll overwhiten the ungated data and waveforms for computing
# inner products
d = self._overwhitened_data[det]
# overwhiten the hp and hc
invpsd = self._invpsds[det]
hp = hp*invpsd
hc = hc*invpsd
# get the various gated inner products
hpd = hp[slc].inner(gated_d[slc]).real # <hp, d>
hcd = hc[slc].inner(gated_d[slc]).real # <hc, d>
dhp = d[slc].inner(gated_hp[slc]).real # <d, hp>
dhc = d[slc].inner(gated_hc[slc]).real # <d, hc>
hphp = hp[slc].inner(gated_hp[slc]).real # <hp, hp>
hchc = hc[slc].inner(gated_hc[slc]).real # <hc, hc>
hphc = hp[slc].inner(gated_hc[slc]).real # <hp, hc>
hchp = hc[slc].inner(gated_hp[slc]).real # <hc, hp>
dd = d[slc].inner(gated_d[slc]).real # <d, d>
# since the antenna patterns are real,
# <h, d>/2 + <d, h>/2 = fp*(<hp, d>/2 + <d, hp>/2)
# + fc*(<hc, d>/2 + <d, hc>/2)
hd = fp*(hpd + dhp) + fc*(hcd + dhc)
# <h, h>/2 = <fp*hp + fc*hc, fp*hp + fc*hc>/2
# = fp*fp*<hp, hp>/2 + fc*fc*<hc, hc>/2
# + fp*fc*<hp, hc>/2 + fc*fp*<hc, hp>/2
hh = fp*fp*hphp + fc*fc*hchc + fp*fc*(hphc + hchp)
# sum up; note that the factor is 2df instead of 4df to account
# for the factor of 1/2
loglr += norm + 2*invpsd.delta_f*(hd - hh)
lognl += -2 * invpsd.delta_f * dd
# store the maxl polarization
idx = loglr.argmax()
setattr(self._current_stats, 'maxl_polarization', self.pol[idx])
setattr(self._current_stats, 'maxl_logl', loglr[idx] + lognl)
# compute the marginalized log likelihood
marglogl = special.logsumexp(loglr) + lognl - numpy.log(len(self.pol))
return float(marglogl)
@property
def multi_signal_support(self):
""" The list of classes that this model supports in a multi-signal
likelihood
"""
return [type(self)]
[docs]
@catch_waveform_error
def multi_loglikelihood(self, models):
""" Calculate a multi-model (signal) likelihood
"""
# Generate the waveforms for each submodel
wfs = []
for m in models + [self]:
wf = m.get_waveforms()
wfs.append(wf)
# combine into a single waveform
combine = {}
for det in self.data:
# get max waveform length
mlenp = max([len(x[det][0]) for x in wfs])
mlenc = max([len(x[det][1]) for x in wfs])
mlen = max([mlenp, mlenc])
# resize plus and cross
wfs[det][0] = wfs[det][0].copy().resize(mlen)
wfs[det][1] = wfs[det][1].copy().resize(mlen)
# combine waveforms
combine[det] = (sum([x[det][0] for x in wfs]), sum([x[det][1]
for x in wfs]))
self._current_wfs = combine
return self._loglikelihood()