# Copyright (C) 2020 Daniel Finstad
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""This module provides model classes and functions for implementing
a relative binning likelihood for parameter estimation.
"""
import logging
import numpy
import itertools
from scipy.interpolate import interp1d
from pycbc.waveform import (get_fd_waveform_sequence,
get_fd_det_waveform_sequence, fd_det_sequence)
from pycbc.detector import Detector
from pycbc.types import Array, TimeSeries
from .gaussian_noise import (BaseGaussianNoise, catch_waveform_error)
from pycbc.waveform import FailedWaveformError
from .relbin_cpu import (likelihood_parts, likelihood_parts_v,
likelihood_parts_multi, likelihood_parts_multi_v,
likelihood_parts_det, likelihood_parts_det_multi,
likelihood_parts_vector,
likelihood_parts_v_pol,
likelihood_parts_v_time,
likelihood_parts_v_pol_time,
likelihood_parts_vectorp, snr_predictor,
likelihood_parts_vectort,
snr_predictor_dom)
from .tools import DistMarg
[docs]
def setup_bins(f_full, f_lo, f_hi, chi=1.0,
eps=0.1, gammas=None,
):
"""Construct frequency bins for use in a relative likelihood
model. For details, see [Barak, Dai & Venumadhav 2018].
Parameters
----------
f_full : array
The full resolution array of frequencies being used in the analysis.
f_lo : float
The starting frequency used in matched filtering. This will be the
left edge of the first frequency bin.
f_hi : float
The ending frequency used in matched filtering. This will be the right
edge of the last frequency bin.
chi : float, optional
Tunable parameter, see [Barak, Dai & Venumadhav 2018]
eps : float, optional
Tunable parameter, see [Barak, Dai & Venumadhav 2018]. Lower values
result in larger number of bins.
gammas : array, optional
Frequency powerlaw indices to be used in computing bins.
Returns
-------
nbin : int
Number of bins.
fbin : numpy.array of floats
Bin edge frequencies.
fbin_ind : numpy.array of ints
Indices of bin edges in full frequency array.
"""
f = numpy.linspace(f_lo, f_hi, 10000)
# f^ga power law index
ga = (
gammas
if gammas is not None
else numpy.array([-5.0 / 3, -2.0 / 3, 1.0, 5.0 / 3, 7.0 / 3])
)
logging.info("Using powerlaw indices: %s", ga)
dalp = chi * 2.0 * numpy.pi / numpy.absolute((f_lo ** ga) - (f_hi ** ga))
dphi = numpy.sum(
numpy.array([numpy.sign(g) * d * (f ** g) for g, d in zip(ga, dalp)]),
axis=0,
)
dphi_diff = dphi - dphi[0]
# now construct frequency bins
nbin = int(dphi_diff[-1] / eps)
dphi2f = interp1d(
dphi_diff, f, kind="slinear", bounds_error=False, fill_value=0.0
)
dphi_grid = numpy.linspace(dphi_diff[0], dphi_diff[-1], nbin + 1)
# frequency grid points
fbin = dphi2f(dphi_grid)
# indices of frequency grid points in the FFT array
fbin_ind = numpy.searchsorted(f_full, fbin)
for idx_fbin, idx_f_full in enumerate(fbin_ind):
if idx_f_full == 0:
curr_idx = 0
elif idx_f_full == len(f_full):
curr_idx = len(f_full) - 1
else:
abs1 = abs(f_full[idx_f_full] - fbin[idx_fbin])
abs2 = abs(f_full[idx_f_full-1] - fbin[idx_fbin])
if abs1 > abs2:
curr_idx = idx_f_full - 1
else:
curr_idx = idx_f_full
fbin_ind[idx_fbin] = curr_idx
fbin_ind = numpy.unique(fbin_ind)
return fbin_ind
[docs]
class Relative(DistMarg, BaseGaussianNoise):
r"""Model that assumes the likelihood in a region around the peak
is slowly varying such that a linear approximation can be made, and
likelihoods can be calculated at a coarser frequency resolution. For
more details on the implementation, see https://arxiv.org/abs/1806.08792.
This model requires the use of a fiducial waveform whose parameters are
near the peak of the likelihood. The fiducial waveform and all template
waveforms used in likelihood calculation are currently generated using
the SPAtmplt approximant.
For more details on initialization parameters and definition of terms, see
:py:class:`BaseGaussianNoise`.
Parameters
----------
variable_params : (tuple of) string(s)
A tuple of parameter names that will be varied.
data : dict
A dictionary of data, in which the keys are the detector names and the
values are the data (assumed to be unwhitened). All data must have the
same frequency resolution.
low_frequency_cutoff : dict
A dictionary of starting frequencies, in which the keys are the
detector names and the values are the starting frequencies for the
respective detectors to be used for computing inner products.
figucial_params : dict
A dictionary of waveform parameters to be used for generating the
fiducial waveform. Keys must be parameter names in the form
'PARAM_ref' where PARAM is a recognized extrinsic parameter or
an intrinsic parameter compatible with the chosen approximant.
gammas : array of floats, optional
Frequency powerlaw indices to be used in computing frequency bins.
epsilon : float, optional
Tuning parameter used in calculating the frequency bins. Lower values
will result in higher resolution and more bins.
earth_rotation: boolean, optional
Default is False. If True, then vary the fp/fc polarization values
as a function of frequency bin, using a predetermined PN approximation
for the time offsets.
\**kwargs :
All other keyword arguments are passed to
:py:class:`BaseGaussianNoise`.
"""
name = "relative"
def __init__(
self,
variable_params,
data,
low_frequency_cutoff,
fiducial_params=None,
gammas=None,
epsilon=0.5,
earth_rotation=False,
earth_rotation_mode=2,
marginalize_phase=True,
**kwargs
):
variable_params, kwargs = self.setup_marginalization(
variable_params,
marginalize_phase=marginalize_phase,
**kwargs)
super(Relative, self).__init__(
variable_params, data, low_frequency_cutoff, **kwargs
)
# If the waveform needs us to apply the detector response,
# set flag to true (most cases for ground-based observatories).
self.still_needs_det_response = False
if self.static_params['approximant'] in fd_det_sequence:
self.still_needs_det_response = True
# reference waveform and bin edges
self.f, self.df, self.end_time, self.det = {}, {}, {}, {}
self.h00, self.h00_sparse = {}, {}
self.fedges, self.edges = {}, {}
self.ta, self.antenna_time = {}, {}
# filtered summary data for linear approximation
self.sdat = {}
# store fiducial waveform params
self.fid_params = self.static_params.copy()
self.fid_params.update(fiducial_params)
# the flag used in `_loglr`
self.return_sh_hh = False
for k in self.static_params:
if self.fid_params[k] == 'REPLACE':
self.fid_params.pop(k)
for ifo in data:
# store data and frequencies
d0 = self.data[ifo]
self.f[ifo] = numpy.array(d0.sample_frequencies)
self.df[ifo] = d0.delta_f
self.end_time[ifo] = float(d0.end_time)
# generate fiducial waveform
f_lo = self.kmin[ifo] * self.df[ifo]
f_hi = self.kmax[ifo] * self.df[ifo]
logging.info(
"%s: Generating fiducial waveform from %s to %s Hz",
ifo, f_lo, f_hi,
)
# prune low frequency samples to avoid waveform errors
fpoints = Array(self.f[ifo].astype(numpy.float64))
fpoints = fpoints[self.kmin[ifo]:self.kmax[ifo]+1]
if self.still_needs_det_response:
wave = get_fd_det_waveform_sequence(ifos=ifo,
sample_points=fpoints,
**self.fid_params)
curr_wav = wave[ifo]
self.ta[ifo] = 0.
else:
fid_hp, fid_hc = get_fd_waveform_sequence(sample_points=fpoints,
**self.fid_params)
# Apply detector response if not handled by
# the waveform generator
self.det[ifo] = Detector(ifo)
dt = self.det[ifo].time_delay_from_earth_center(
self.fid_params["ra"],
self.fid_params["dec"],
self.fid_params["tc"],
)
self.ta[ifo] = self.fid_params["tc"] + dt
fp, fc = self.det[ifo].antenna_pattern(
self.fid_params["ra"], self.fid_params["dec"],
self.fid_params["polarization"], self.fid_params["tc"])
curr_wav = (fid_hp * fp + fid_hc * fc)
# check for zeros at low and high frequencies
# make sure only nonzero samples are included in bins
numzeros_lo = list(curr_wav != 0j).index(True)
if numzeros_lo > 0:
new_kmin = self.kmin[ifo] + numzeros_lo
f_lo = new_kmin * self.df[ifo]
logging.info(
"WARNING! Fiducial waveform starts above "
"low-frequency-cutoff, initial bin frequency "
"will be %s Hz", f_lo)
numzeros_hi = list(curr_wav[::-1] != 0j).index(True)
if numzeros_hi > 0:
new_kmax = self.kmax[ifo] - numzeros_hi
f_hi = new_kmax * self.df[ifo]
logging.info(
"WARNING! Fiducial waveform terminates below "
"high-frequency-cutoff, final bin frequency "
"will be %s Hz", f_hi)
self.ta[ifo] -= self.end_time[ifo]
curr_wav.resize(len(self.f[ifo]))
curr_wav = numpy.roll(curr_wav, self.kmin[ifo])
# We'll apply this to the data, in lieu of the ref waveform
# This makes it easier to compare target signal to reference later
tshift = numpy.exp(-2.0j * numpy.pi * self.f[ifo] * self.ta[ifo])
self.h00[ifo] = numpy.array(curr_wav) # * tshift
data_shifted = self.data[ifo] * numpy.conjugate(tshift)
logging.info("Computing frequency bins")
fbin_ind = setup_bins(
f_full=self.f[ifo], f_lo=f_lo, f_hi=f_hi,
gammas=gammas, eps=float(epsilon),
)
logging.info("Using %s bins for this model", len(fbin_ind))
self.fedges[ifo] = self.f[ifo][fbin_ind]
self.edges[ifo] = fbin_ind
self.init_from_frequencies(data_shifted, self.h00, fbin_ind, ifo)
self.antenna_time[ifo] = self.setup_antenna(
earth_rotation,
int(earth_rotation_mode),
self.fedges[ifo])
self.combine_layout()
[docs]
def init_from_frequencies(self, data, h00, fbin_ind, ifo):
bins = numpy.array(
[
(fbin_ind[i], fbin_ind[i + 1])
for i in range(len(fbin_ind) - 1)
]
)
# store low res copy of fiducial waveform
self.h00_sparse[ifo] = h00[ifo].copy().take(fbin_ind)
# compute summary data
logging.info(
"Calculating summary data at frequency resolution %s Hz",
self.df[ifo],
)
a0, a1 = self.summary_product(data, h00[ifo], bins, ifo)
b0, b1 = self.summary_product(h00[ifo], h00[ifo], bins, ifo)
self.sdat[ifo] = {"a0": a0, "a1": a1, "b0": abs(b0), "b1": abs(b1)}
[docs]
def combine_layout(self):
# determine the unique ifo layouts
self.edge_unique = []
self.ifo_map = {}
for ifo in self.fedges:
if len(self.edge_unique) == 0:
self.ifo_map[ifo] = 0
self.edge_unique.append(Array(self.fedges[ifo]))
else:
for i, edge in enumerate(self.edge_unique):
if numpy.array_equal(edge, self.fedges[ifo]):
self.ifo_map[ifo] = i
break
else:
self.ifo_map[ifo] = len(self.edge_unique)
self.edge_unique.append(Array(self.fedges[ifo]))
logging.info("%s unique ifo layouts", len(self.edge_unique))
[docs]
def setup_antenna(self, earth_rotation, mode, fedges):
# Calculate the times to evaluate fp/fc
self.earth_rotation = earth_rotation
if earth_rotation is not False:
logging.info("Enabling frequency-dependent earth rotation")
from pycbc.waveform.spa_tmplt import spa_length_in_time
times = spa_length_in_time(
phase_order=-1,
mass1=self.fid_params["mass1"],
mass2=self.fid_params["mass2"],
f_lower=numpy.array(fedges) / mode * 2.0,
)
atimes = self.fid_params["tc"] - times
self.lik = likelihood_parts_v
self.mlik = likelihood_parts_multi_v
else:
atimes = self.fid_params["tc"]
if self.still_needs_det_response:
self.lik = likelihood_parts_det
self.mlik = likelihood_parts_det_multi
else:
self.lik = likelihood_parts
self.mlik = likelihood_parts_multi
return atimes
@property
def likelihood_function(self):
self.lformat = None
if self.marginalize_vector_params:
p = self.current_params
vmarg = set(k for k in self.marginalize_vector_params
if not numpy.isscalar(p[k]))
if self.earth_rotation:
if set(['tc', 'polarization']).issubset(vmarg):
self.lformat = 'earth_time_pol'
return likelihood_parts_v_pol_time
elif set(['polarization']).issubset(vmarg):
self.lformat = 'earth_pol'
return likelihood_parts_v_pol
elif set(['tc']).issubset(vmarg):
self.lformat = 'earth_time'
return likelihood_parts_v_time
else:
if set(['ra', 'dec', 'tc']).issubset(vmarg):
return likelihood_parts_vector
elif set(['tc', 'polarization']).issubset(vmarg):
return likelihood_parts_vector
elif set(['tc']).issubset(vmarg):
return likelihood_parts_vectort
elif set(['polarization']).issubset(vmarg):
return likelihood_parts_vectorp
return self.lik
[docs]
def summary_product(self, h1, h2, bins, ifo):
""" Calculate the summary values for the inner product <h1|h2>
"""
# calculate coefficients
h12 = numpy.conjugate(h1) * h2 / self.psds[ifo]
# constant terms
a0 = numpy.array([
4.0 * self.df[ifo] * h12[l:h].sum()
for l, h in bins
])
# linear terms
a1 = numpy.array([
4.0 / (h - l) *
(h12[l:h] * (self.f[ifo][l:h] - self.f[ifo][l])).sum()
for l, h in bins])
return a0, a1
@property
def multi_signal_support(self):
""" The list of classes that this model supports in a multi-signal
likelihood
"""
# Check if this model *can* be included in a multi-signal model.
# All marginalizations must currently be disabled to work!
if (self.marginalize_vector_params or
self.marginalize_distance or
self.marginalize_phase):
logging.info("Cannot use single template model inside of"
"multi_signal if marginalizations are enabled")
return [type(self)]
[docs]
def calculate_hihjs(self, models):
""" Pre-calculate the hihj inner products on a grid
"""
self.hihj = {}
for m1, m2 in itertools.combinations(models, 2):
self.hihj[(m1, m2)] = {}
for ifo in self.data:
h1 = m1.h00[ifo]
h2 = m2.h00[ifo]
# Combine the grids
edge = numpy.unique([m1.edges[ifo], m2.edges[ifo]])
# Remove any points where either reference is zero
keep = numpy.where((h1[edge] != 0) | (h2[edge] != 0))[0]
edge = edge[keep]
fedge = m1.f[ifo][edge]
bins = numpy.array([
(edge[i], edge[i + 1])
for i in range(len(edge) - 1)
])
a0, a1 = self.summary_product(h1, h2, bins, ifo)
self.hihj[(m1, m2)][ifo] = a0, a1, fedge
[docs]
def multi_loglikelihood(self, models):
""" Calculate a multi-model (signal) likelihood
"""
models = [self] + models
loglr = 0
# handle sum[<d|h_i> - 0.5 <h_i|h_i>]
for m in models:
loglr += m.loglr
if not hasattr(self, 'hihj'):
self.calculate_hihjs(models)
if self.still_needs_det_response:
for m1, m2 in itertools.combinations(models, 2):
for det in self.data:
a0, a1, fedge = self.hihj[(m1, m2)][det]
dtc, channel, h00 = m1._current_wf_parts[det]
dtc2, channel2, h002 = m2._current_wf_parts[det]
c1c2 = self.mlik(fedge,
dtc, channel, h00,
dtc2, channel2, h002,
a0, a1)
loglr += - c1c2.real # This is -0.5 * re(<h1|h2> + <h2|h1>)
else:
# finally add in the lognl term from this model
for m1, m2 in itertools.combinations(models, 2):
for det in self.data:
a0, a1, fedge = self.hihj[(m1, m2)][det]
fp, fc, dtc, hp, hc, h00 = m1._current_wf_parts[det]
fp2, fc2, dtc2, hp2, hc2, h002 = m2._current_wf_parts[det]
h1h2 = self.mlik(fedge,
fp, fc, dtc, hp, hc, h00,
fp2, fc2, dtc2, hp2, hc2, h002,
a0, a1)
loglr += - h1h2.real # This is -0.5 * re(<h1|h2> + <h2|h1>)
return loglr + self.lognl
@catch_waveform_error
def _loglr(self):
r"""Computes the log likelihood ratio,
or inner product <s|h> and <h|h> if `self.return_sh_hh` is True.
.. math::
\log \mathcal{L}(\Theta) = \sum_i
\left<h_i(\Theta)|d_i\right> -
\frac{1}{2}\left<h_i(\Theta)|h_i(\Theta)\right>,
at the current parameter values :math:`\Theta`.
Returns
-------
float
The value of the log likelihood ratio.
or
tuple
The inner product (<s|h>, <h|h>).
"""
# get model params
p = self.current_params
wfs = self.get_waveforms(p)
lik = self.likelihood_function
norm = 0.0
filt = 0j
self._current_wf_parts = {}
pol_phase = numpy.exp(-2.0j * p['polarization'])
for ifo in self.data:
freqs = self.fedges[ifo]
sdat = self.sdat[ifo]
h00 = self.h00_sparse[ifo]
end_time = self.end_time[ifo]
times = self.antenna_time[ifo]
# project waveform to detector frame if waveform does not deal
# with detector response. Otherwise, skip detector response.
if self.still_needs_det_response:
dtc = 0.
channel = wfs[ifo].numpy()
filter_i, norm_i = lik(freqs, dtc, channel, h00,
sdat['a0'], sdat['a1'],
sdat['b0'], sdat['b1'])
self._current_wf_parts[ifo] = (dtc, channel, h00)
else:
hp, hc = wfs[ifo]
det = self.det[ifo]
fp, fc = det.antenna_pattern(p["ra"], p["dec"],
0.0, times)
dt = det.time_delay_from_earth_center(p["ra"], p["dec"], times)
dtc = p["tc"] + dt - end_time - self.ta[ifo]
if self.lformat == 'earth_pol':
filter_i, norm_i = lik(freqs, fp, fc, dtc, pol_phase,
hp, hc, h00,
sdat['a0'], sdat['a1'],
sdat['b0'], sdat['b1'])
else:
f = (fp + 1.0j * fc) * pol_phase
fp = f.real.copy()
fc = f.imag.copy()
filter_i, norm_i = lik(freqs, fp, fc, dtc,
hp, hc, h00,
sdat['a0'], sdat['a1'],
sdat['b0'], sdat['b1'])
self._current_wf_parts[ifo] = (fp, fc, dtc, hp, hc, h00)
filt += filter_i
norm += norm_i
loglr = self.marginalize_loglr(filt, norm)
if self.return_sh_hh:
results = (filt, norm)
else:
results = loglr
return results
def _nowaveform_handler(self):
"""Returns -inf for loglr if no waveform generated.
If `return_sh_hh` is set to True, a FailedWaveformError will be raised.
"""
if self.return_sh_hh:
raise FailedWaveformError("Waveform failed to generate and "
"return_sh_hh set to True! I don't know "
"what to return in this case.")
return -numpy.inf
[docs]
def max_curvature_from_reference(self):
""" Return the maximum change in slope between frequency bins
relative to the reference waveform.
"""
dmax = 0
for ifo in self.data:
r = self.wf_ret[ifo][0] / self.h00_sparse[ifo]
d = abs(numpy.diff(r / abs(r).min(), n=2)).max()
dmax = d if dmax < d else dmax
return dmax
[docs]
class RelativeTime(Relative):
""" Heterodyne likelihood optimized for time marginalization. In addition
it supports phase (dominant-mode), sky location, and polarization
marginalization.
"""
name = "relative_time"
def __init__(self, *args,
sample_rate=4096,
**kwargs):
super(RelativeTime, self).__init__(*args, **kwargs)
self.sample_rate = float(sample_rate)
self.setup_peak_lock(sample_rate=self.sample_rate, **kwargs)
self.draw_ifos(self.ref_snr, **kwargs)
@property
def ref_snr(self):
if not hasattr(self, '_ref_snr'):
wfs = {ifo: (self.h00_sparse[ifo],
self.h00_sparse[ifo]) for ifo in self.h00_sparse}
self._ref_snr = self.get_snr(wfs)
return self._ref_snr
[docs]
def get_snr(self, wfs):
""" Return hp/hc maximized SNR time series
"""
delta_t = 1.0 / self.sample_rate
snrs = {}
for ifo in wfs:
sdat = self.sdat[ifo]
dtc = self.tstart[ifo] - self.end_time[ifo] - self.ta[ifo]
snr = snr_predictor(self.fedges[ifo],
dtc - delta_t * 2.0, delta_t,
self.num_samples[ifo] + 4,
wfs[ifo][0], wfs[ifo][1],
self.h00_sparse[ifo],
sdat['a0'], sdat['a1'],
sdat['b0'], sdat['b1'])
snrs[ifo] = TimeSeries(snr, delta_t=delta_t,
epoch=self.tstart[ifo] - delta_t * 2.0)
return snrs
@catch_waveform_error
def _loglr(self):
r"""Computes the log likelihood ratio,
.. math::
\log \mathcal{L}(\Theta) = \sum_i
\left<h_i(\Theta)|d_i\right> -
\frac{1}{2}\left<h_i(\Theta)|h_i(\Theta)\right>,
at the current parameter values :math:`\Theta`.
Returns
-------
float
The value of the log likelihood ratio.
"""
# get model params
p = self.current_params
wfs = self.get_waveforms(p)
lik = self.likelihood_function
norm = 0.0
filt = 0j
pol_phase = numpy.exp(-2.0j * p['polarization'])
self.snr_draw(wfs)
p = self.current_params
for ifo in self.data:
freqs = self.fedges[ifo]
sdat = self.sdat[ifo]
h00 = self.h00_sparse[ifo]
end_time = self.end_time[ifo]
times = self.antenna_time[ifo]
hp, hc = wfs[ifo]
det = self.det[ifo]
fp, fc = det.antenna_pattern(p["ra"], p["dec"],
0, times)
times = det.time_delay_from_earth_center(p["ra"], p["dec"], times)
dtc = p["tc"] - end_time - self.ta[ifo]
if self.lformat == 'earth_time_pol':
filter_i, norm_i = lik(
freqs, fp, fc, times, dtc, pol_phase,
hp, hc, h00,
sdat['a0'], sdat['a1'],
sdat['b0'], sdat['b1'])
else:
f = (fp + 1.0j * fc) * pol_phase
fp = f.real.copy()
fc = f.imag.copy()
if self.lformat == 'earth_time':
filter_i, norm_i = lik(
freqs, fp, fc, times, dtc,
hp, hc, h00,
sdat['a0'], sdat['a1'],
sdat['b0'], sdat['b1'])
else:
filter_i, norm_i = lik(freqs, fp, fc, times + dtc,
hp, hc, h00,
sdat['a0'], sdat['a1'],
sdat['b0'], sdat['b1'])
filt += filter_i
norm += norm_i
loglr = self.marginalize_loglr(filt, norm)
return loglr
def _nowaveform_handler(self):
"""Sets loglr values if no waveform generated.
"""
return -numpy.inf
[docs]
class RelativeTimeDom(RelativeTime):
""" Heterodyne likelihood optimized for time marginalization and only
dominant-mode waveforms. This enables the ability to do inclination
marginalization in addition to the other forms supportedy by RelativeTime.
"""
name = "relative_time_dom"
[docs]
def get_snr(self, wfs):
""" Return hp/hc maximized SNR time series
"""
delta_t = 1.0 / self.sample_rate
snrs = {}
self.sh = {}
self.hh = {}
for ifo in wfs:
sdat = self.sdat[ifo]
dtc = self.tstart[ifo] - self.end_time[ifo] - self.ta[ifo]
sh, hh = snr_predictor_dom(self.fedges[ifo],
dtc - delta_t * 2.0, delta_t,
self.num_samples[ifo] + 4,
wfs[ifo][0],
self.h00_sparse[ifo],
sdat['a0'], sdat['a1'],
sdat['b0'], sdat['b1'])
snr = TimeSeries(abs(sh[2:-2]) / hh ** 0.5, delta_t=delta_t,
epoch=self.tstart[ifo])
self.sh[ifo] = TimeSeries(sh, delta_t=delta_t,
epoch=self.tstart[ifo] - delta_t * 2.0)
self.hh[ifo] = hh
snrs[ifo] = snr
return snrs
@catch_waveform_error
def _loglr(self):
r"""Computes the log likelihood ratio,
or inner product <s|h> and <h|h> if `self.return_sh_hh` is True.
.. math::
\log \mathcal{L}(\Theta) = \sum_i
\left<h_i(\Theta)|d_i\right> -
\frac{1}{2}\left<h_i(\Theta)|h_i(\Theta)\right>,
at the current parameter values :math:`\Theta`.
Returns
-------
float
The value of the log likelihood ratio.
or
tuple
The inner product (<s|h>, <h|h>).
"""
# calculate <d-h|d-h> = <h|h> - 2<h|d> + <d|d> up to a constant
p = self.current_params
p2 = p.copy()
p2.pop('inclination')
wfs = self.get_waveforms(p2)
sh_total = hh_total = 0
ic = numpy.cos(p['inclination'])
ip = 0.5 * (1.0 + ic * ic)
pol_phase = numpy.exp(-2.0j * p['polarization'])
snrs = self.get_snr(wfs)
self.snr_draw(snrs=snrs)
for ifo in self.sh:
if self.precalc_antenna_factors:
fp, fc, dt = self.get_precalc_antenna_factors(ifo)
else:
dt = self.det[ifo].time_delay_from_earth_center(p['ra'],
p['dec'],
p['tc'])
fp, fc = self.det[ifo].antenna_pattern(p['ra'], p['dec'],
0, p['tc'])
dts = p['tc'] + dt
f = (fp + 1.0j * fc) * pol_phase
# Note, this includes complex conjugation already
# as our stored inner products were hp* x data
htf = (f.real * ip + 1.0j * f.imag * ic)
sh = self.sh[ifo].at_time(dts,
interpolate='quadratic',
extrapolate=0.0j)
sh_total += sh * htf
hh_total += self.hh[ifo] * abs(htf) ** 2.0
loglr = self.marginalize_loglr(sh_total, hh_total)
if self.return_sh_hh:
results = (sh_total, hh_total)
else:
results = loglr
return results
def _nowaveform_handler(self):
"""Sets loglr values if no waveform generated.
"""
loglr = sh_total = hh_total = -numpy.inf
if self.return_sh_hh:
results = (sh_total, hh_total)
else:
results = loglr
return results