Source code for pycbc.waveform.compress
# Copyright (C) 2016 Alex Nitz, Collin Capano
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
""" Utilities for handling frequency compressed an unequally spaced frequency
domain waveforms.
"""
import lal, numpy, logging, h5py
from pycbc import filter
from scipy import interpolate
from pycbc.types import FrequencySeries, real_same_precision_as
from pycbc.waveform import utils
from pycbc.scheme import schemed
from pycbc.io.hdf import HFile
[docs]
def rough_time_estimate(m1, m2, flow, fudge_length=1.1, fudge_min=0.02):
""" A very rough estimate of the duration of the waveform.
An estimate of the waveform duration starting from flow. This is intended
to be fast but not necessarily accurate. It should be an overestimate of
the length. It is derived from a simplification of the 0PN post-newtonian
terms and includes a fudge factor for possible ringdown, etc.
Parameters
----------
m1: float
mass of first component object in solar masses
m2: float
mass of second component object in solar masses
flow: float
starting frequency of the waveform
fudge_length: optional, {1.1, float}
Factor to multiply length estimate by to ensure it is a convservative
value
fudge_min: optional, {0.2, float}
Minimum signal duration that can be returned. This should be long
enough to encompass the ringdown and errors in the precise end time.
Returns
-------
time: float
Time from flow untill the end of the waveform
"""
m = m1 + m2
msun = m * lal.MTSUN_SI
t = 5.0 / 256.0 * m * m * msun / (m1 * m2) / \
(numpy.pi * msun * flow) ** (8.0 / 3.0)
# fudge factoriness
return .022 if t < 0 else (t + fudge_min) * fudge_length
[docs]
def mchirp_compression(m1, m2, fmin, fmax, min_seglen=0.02, df_multiple=None):
"""Return the frequencies needed to compress a waveform with the given
chirp mass. This is based on the estimate in rough_time_estimate.
Parameters
----------
m1: float
mass of first component object in solar masses
m2: float
mass of second component object in solar masses
fmin : float
The starting frequency of the compressed waveform.
fmax : float
The ending frequency of the compressed waveform.
min_seglen : float
The inverse of this gives the maximum frequency step that is used.
df_multiple : {None, float}
Make the compressed sampling frequencies a multiple of the given value.
If None provided, the returned sample points can have any floating
point value.
Returns
-------
array
The frequencies at which to evaluate the compressed waveform.
"""
sample_points = []
f = fmin
while f < fmax:
if df_multiple is not None:
f = int(f/df_multiple)*df_multiple
sample_points.append(f)
f += 1.0 / rough_time_estimate(m1, m2, f, fudge_min=min_seglen)
# add the last point
if sample_points[-1] < fmax:
sample_points.append(fmax)
return numpy.array(sample_points)
[docs]
def spa_compression(htilde, fmin, fmax, min_seglen=0.02,
sample_frequencies=None):
"""Returns the frequencies needed to compress the given frequency domain
waveform. This is done by estimating t(f) of the waveform using the
stationary phase approximation.
Parameters
----------
htilde : FrequencySeries
The waveform to compress.
fmin : float
The starting frequency of the compressed waveform.
fmax : float
The ending frequency of the compressed waveform.
min_seglen : float
The inverse of this gives the maximum frequency step that is used.
sample_frequencies : {None, array}
The frequencies that the waveform is evaluated at. If None, will
retrieve the frequencies from the waveform's sample_frequencies
attribute.
Returns
-------
array
The frequencies at which to evaluate the compressed waveform.
"""
if sample_frequencies is None:
sample_frequencies = htilde.sample_frequencies.numpy()
kmin = int(fmin/htilde.delta_f)
kmax = int(fmax/htilde.delta_f)
tf = abs(utils.time_from_frequencyseries(htilde,
sample_frequencies=sample_frequencies).data[kmin:kmax])
sample_frequencies = sample_frequencies[kmin:kmax]
sample_points = []
f = fmin
while f < fmax:
f = int(f/htilde.delta_f)*htilde.delta_f
sample_points.append(f)
jj = numpy.searchsorted(sample_frequencies, f)
f += 1./(tf[jj:].max()+min_seglen)
# add the last point
if sample_points[-1] < fmax:
sample_points.append(fmax)
return numpy.array(sample_points)
compression_algorithms = {
'mchirp': mchirp_compression,
'spa': spa_compression
}
def _vecdiff(htilde, hinterp, fmin, fmax, psd=None):
return abs(filter.overlap_cplx(htilde, htilde,
low_frequency_cutoff=fmin,
high_frequency_cutoff=fmax,
normalized=False, psd=psd)
- filter.overlap_cplx(htilde, hinterp,
low_frequency_cutoff=fmin,
high_frequency_cutoff=fmax,
normalized=False, psd=psd))
[docs]
def vecdiff(htilde, hinterp, sample_points, psd=None):
"""Computes a statistic indicating between which sample points a waveform
and the interpolated waveform differ the most.
"""
vecdiffs = numpy.zeros(sample_points.size-1, dtype=float)
for kk,thisf in enumerate(sample_points[:-1]):
nextf = sample_points[kk+1]
vecdiffs[kk] = abs(_vecdiff(htilde, hinterp, thisf, nextf, psd=psd))
return vecdiffs
[docs]
def compress_waveform(htilde, sample_points, tolerance, interpolation,
precision, decomp_scratch=None, psd=None):
"""Retrieves the amplitude and phase at the desired sample points, and adds
frequency points in order to ensure that the interpolated waveform
has a mismatch with the full waveform that is <= the desired tolerance. The
mismatch is computed by finding 1-overlap between `htilde` and the
decompressed waveform; no maximimization over phase/time is done, a
PSD may be used.
.. note::
The decompressed waveform is only garaunteed to have a true mismatch
<= the tolerance for the given `interpolation` and for no PSD.
However, since no maximization over time/phase is performed when
adding points, the actual mismatch between the decompressed waveform
and `htilde` is better than the tolerance, using no PSD. Using a PSD
does increase the mismatch, and can lead to mismatches > than the
desired tolerance, but typically by only a factor of a few worse.
Parameters
----------
htilde : FrequencySeries
The waveform to compress.
sample_points : array
The frequencies at which to store the amplitude and phase. More points
may be added to this, depending on the desired tolerance.
tolerance : float
The maximum mismatch to allow between a decompressed waveform and
`htilde`.
interpolation : str
The interpolation to use for decompressing the waveform when computing
overlaps.
precision : str
The precision being used to generate and store the compressed waveform
points.
decomp_scratch : {None, FrequencySeries}
Optionally provide scratch space for decompressing the waveform. The
provided frequency series must have the same `delta_f` and length
as `htilde`.
psd : {None, FrequencySeries}
The psd to use for calculating the overlap between the decompressed
waveform and the original full waveform.
Returns
-------
CompressedWaveform
The compressed waveform data; see `CompressedWaveform` for details.
"""
fmin = sample_points.min()
df = htilde.delta_f
sample_index = (sample_points / df).astype(int)
amp = utils.amplitude_from_frequencyseries(htilde)
phase = utils.phase_from_frequencyseries(htilde)
comp_amp = amp.take(sample_index)
comp_phase = phase.take(sample_index)
if decomp_scratch is None:
outdf = df
else:
outdf = None
hdecomp = fd_decompress(comp_amp, comp_phase, sample_points,
out=decomp_scratch, df=outdf, f_lower=fmin,
interpolation=interpolation)
kmax = min(len(htilde), len(hdecomp))
htilde = htilde[:kmax]
hdecomp = hdecomp[:kmax]
mismatch = 1. - filter.overlap(hdecomp, htilde, psd=psd,
low_frequency_cutoff=fmin)
if mismatch > tolerance:
# we'll need the difference in the waveforms as a function of frequency
vecdiffs = vecdiff(htilde, hdecomp, sample_points, psd=psd)
# We will find where in the frequency series the interpolated waveform
# has the smallest overlap with the full waveform, add a sample point
# there, and re-interpolate. We repeat this until the overall mismatch
# is > than the desired tolerance
added_points = []
while mismatch > tolerance:
minpt = vecdiffs.argmax()
# add a point at the frequency halfway between minpt and minpt+1
add_freq = sample_points[[minpt, minpt+1]].mean()
addidx = int(round(add_freq/df))
# ensure that only new points are added
if addidx in sample_index:
diffidx = vecdiffs.argsort()
addpt = -1
while addidx in sample_index:
addpt -= 1
try:
minpt = diffidx[addpt]
except IndexError:
raise ValueError("unable to compress to desired tolerance")
add_freq = sample_points[[minpt, minpt+1]].mean()
addidx = int(round(add_freq/df))
new_index = numpy.zeros(sample_index.size+1, dtype=int)
new_index[:minpt+1] = sample_index[:minpt+1]
new_index[minpt+1] = addidx
new_index[minpt+2:] = sample_index[minpt+1:]
sample_index = new_index
sample_points = (sample_index * df).astype(
real_same_precision_as(htilde))
# get the new compressed points
comp_amp = amp.take(sample_index)
comp_phase = phase.take(sample_index)
# update the vecdiffs and mismatch
hdecomp = fd_decompress(comp_amp, comp_phase, sample_points,
out=decomp_scratch, df=outdf,
f_lower=fmin, interpolation=interpolation)
hdecomp = hdecomp[:kmax]
new_vecdiffs = numpy.zeros(vecdiffs.size+1)
new_vecdiffs[:minpt] = vecdiffs[:minpt]
new_vecdiffs[minpt+2:] = vecdiffs[minpt+1:]
new_vecdiffs[minpt:minpt+2] = vecdiff(htilde, hdecomp,
sample_points[minpt:minpt+2],
psd=psd)
vecdiffs = new_vecdiffs
mismatch = 1. - filter.overlap(hdecomp, htilde, psd=psd,
low_frequency_cutoff=fmin)
added_points.append(addidx)
compression_factor = len(htilde) / len(sample_points)
logging.info(
"mismatch: %f, N points: %i (%i added), compression:%.3e",
mismatch,
len(comp_amp),
len(added_points),
compression_factor
)
return CompressedWaveform(sample_points, comp_amp, comp_phase,
interpolation=interpolation,
tolerance=tolerance, mismatch=mismatch,
precision=precision,
compression_factor=compression_factor)
_precision_map = {
'float32': 'single',
'float64': 'double',
'complex64': 'single',
'complex128': 'double'
}
_complex_dtypes = {
'single': numpy.complex64,
'double': numpy.complex128
}
_real_dtypes = {
'single': numpy.float32,
'double': numpy.float64
}
[docs]
@schemed("pycbc.waveform.decompress_")
def inline_linear_interp(amp, phase, sample_frequencies, output,
df, f_lower, imin, start_index):
"""Generate a frequency-domain waveform via linear interpolation
from sampled amplitude and phase. The sample frequency locations
for the amplitude and phase must be the same. This function may
be less accurate than scipy's linear interpolation, but should be
much faster. Additionally, it is 'schemed' and so may run under
either CPU or GPU schemes.
This function is not ordinarily called directly, but rather by
giving the argument 'interpolation' the value 'inline_linear'
when calling the function 'fd_decompress' below.
Parameters
----------
amp : array
The amplitude of the waveform at the sample frequencies.
phase : array
The phase of the waveform at the sample frequencies.
sample_frequencies : array
The frequency (in Hz) of the waveform at the sample frequencies.
output : {None, FrequencySeries}
The output array to save the decompressed waveform to. If this contains
slots for frequencies > the maximum frequency in sample_frequencies,
the rest of the values are zeroed. If not provided, must provide a df.
df : {None, float}
The frequency step to use for the decompressed waveform. Must be
provided if out is None.
f_lower : float
The frequency to start the decompression at. All values at
frequencies less than this will be 0 in the decompressed waveform.
imin : int
The index at which to start in the sampled frequency series. Must
therefore be 0 <= imin < len(sample_frequencies)
start_index : int
The index at which to start in the output frequency;
i.e., ceil(f_lower/df).
Returns
-------
output : FrequencySeries
If out was provided, writes to that array. Otherwise, a new
FrequencySeries with the decompressed waveform.
"""
return
[docs]
def fd_decompress(amp, phase, sample_frequencies, out=None, df=None,
f_lower=None, interpolation='inline_linear'):
"""Decompresses an FD waveform using the given amplitude, phase, and the
frequencies at which they are sampled at.
Parameters
----------
amp : array
The amplitude of the waveform at the sample frequencies.
phase : array
The phase of the waveform at the sample frequencies.
sample_frequencies : array
The frequency (in Hz) of the waveform at the sample frequencies.
out : {None, FrequencySeries}
The output array to save the decompressed waveform to. If this contains
slots for frequencies > the maximum frequency in sample_frequencies,
the rest of the values are zeroed. If not provided, must provide a df.
df : {None, float}
The frequency step to use for the decompressed waveform. Must be
provided if out is None.
f_lower : {None, float}
The frequency to start the decompression at. If None, will use whatever
the lowest frequency is in sample_frequencies. All values at
frequencies less than this will be 0 in the decompressed waveform.
interpolation : {'inline_linear', str}
The interpolation to use for the amplitude and phase. Default is
'inline_linear'. If 'inline_linear' a custom interpolater is used.
Otherwise, ``scipy.interpolate.interp1d`` is used; for other options,
see possible values for that function's ``kind`` argument.
Returns
-------
out : FrequencySeries
If out was provided, writes to that array. Otherwise, a new
FrequencySeries with the decompressed waveform.
"""
precision = _precision_map[sample_frequencies.dtype.name]
if _precision_map[amp.dtype.name] != precision or \
_precision_map[phase.dtype.name] != precision:
raise ValueError("amp, phase, and sample_points must all have the "
"same precision")
if out is None:
if df is None:
raise ValueError("Either provide output memory or a df")
hlen = int(numpy.ceil(sample_frequencies.max()/df+1))
out = FrequencySeries(numpy.zeros(hlen,
dtype=_complex_dtypes[precision]), copy=False,
delta_f=df)
else:
# check for precision compatibility
if out.precision == 'double' and precision == 'single':
raise ValueError("cannot cast single precision to double")
df = out.delta_f
hlen = len(out)
if f_lower is None:
imin = 0 # pylint:disable=unused-variable
f_lower = sample_frequencies[0]
start_index = 0
else:
if f_lower >= sample_frequencies.max():
raise ValueError("f_lower is > than the maximum sample frequency")
if f_lower < sample_frequencies.min():
raise ValueError("f_lower is < than the minimum sample frequency")
imin = int(numpy.searchsorted(sample_frequencies, f_lower,
side='right')) - 1 # pylint:disable=unused-variable
start_index = int(numpy.ceil(f_lower/df))
if start_index >= hlen:
raise ValueError('requested f_lower >= largest frequency in out')
# interpolate the amplitude and the phase
if interpolation == "inline_linear":
# Call the scheme-dependent function
inline_linear_interp(amp, phase, sample_frequencies, out,
df, f_lower, imin, start_index)
else:
# use scipy for fancier interpolation
sample_frequencies = numpy.array(sample_frequencies)
amp = numpy.array(amp)
phase = numpy.array(phase)
outfreq = out.sample_frequencies.numpy()
amp_interp = interpolate.interp1d(sample_frequencies, amp,
kind=interpolation,
bounds_error=False,
fill_value=0.,
assume_sorted=True)
phase_interp = interpolate.interp1d(sample_frequencies, phase,
kind=interpolation,
bounds_error=False,
fill_value=0.,
assume_sorted=True)
A = amp_interp(outfreq)
phi = phase_interp(outfreq)
out.data[:] = A*numpy.cos(phi) + (1j)*A*numpy.sin(phi)
return out
[docs]
class CompressedWaveform(object):
"""Class that stores information about a compressed waveform.
Parameters
----------
sample_points : {array, h5py.Dataset}
The frequency points at which the compressed waveform is sampled.
amplitude : {array, h5py.Dataset}
The amplitude of the waveform at the given `sample_points`.
phase : {array, h5py.Dataset}
The phase of the waveform at the given `sample_points`.
interpolation : {None, str}
The interpolation that was used when compressing the waveform for
computing tolerance. This is also the default interpolation used when
decompressing; see `decompress` for details.
tolerance : {None, float}
The tolerance that was used when compressing the waveform.
mismatch : {None, float}
The actual mismatch between the decompressed waveform (using the given
`interpolation`) and the full waveform.
precision : {'double', str}
The precision used to generate the compressed waveform's amplitude and
phase points. Default is 'double'.
load_to_memory : {True, bool}
If `sample_points`, `amplitude`, and/or `phase` is an hdf dataset, they
will be cached in memory the first time they are accessed. Default is
True.
Attributes
----------
load_to_memory : bool
Whether or not to load `sample_points`/`amplitude`/`phase` into memory
the first time they are accessed, if they are hdf datasets. Can be
set directly to toggle this behavior.
interpolation : str
The interpolation that was used when compressing the waveform, for
checking the mismatch. Also the default interpolation used when
decompressing.
tolerance : {None, float}
The tolerance that was used when compressing the waveform.
mismatch : {None, float}
The mismatch between the decompressed waveform and the original
waveform.
precision : {'double', str}
The precision used to generate and store the compressed waveform
points. Options are 'double' or 'single'; default is 'double
"""
def __init__(self, sample_points, amplitude, phase,
interpolation=None, tolerance=None, mismatch=None,
precision='double', load_to_memory=True,
compression_factor=None):
self._sample_points = sample_points
self._amplitude = amplitude
self._phase = phase
self._cache = {}
self.load_to_memory = load_to_memory
self.compression_factor = compression_factor
# if sample points, amplitude, and/or phase are hdf datasets,
# save their filenames
self._filenames = {}
self._groupnames = {}
for arrname in ['sample_points', 'amplitude', 'phase']:
try:
fname = getattr(self, '_{}'.format(arrname)).file.filename
gname = getattr(self, '_{}'.format(arrname)).name
except AttributeError:
fname = None
gname = None
self._filenames[arrname] = fname
self._groupnames[arrname] = gname
# metadata
self.interpolation = interpolation
self.tolerance = tolerance
self.mismatch = mismatch
self.precision = precision
def _get(self, param):
val = getattr(self, '_%s' %param)
if isinstance(val, h5py.Dataset):
try:
val = self._cache[param]
except KeyError:
try:
val = val[:]
except ValueError:
# this can happen if the file is closed; if so, open it
# and get the data
fp = HFile(self._filenames[param], 'r')
val = fp[self._groupnames[param]][:]
fp.close()
if self.load_to_memory:
self._cache[param] = val
return val
@property
def amplitude(self):
"""The amplitude of the waveform at the `sample_points`.
This is always returned as an array; the same logic as for
`sample_points` is used to determine whether or not to cache in
memory.
Returns
-------
amplitude : Array
"""
return self._get('amplitude')
@property
def phase(self):
"""The phase of the waveform as the `sample_points`.
This is always returned as an array; the same logic as for
`sample_points` returned as an array; the same logic as for
`sample_points` is used to determine whether or not to cache in
memory.
Returns
-------
phase : Array
"""
return self._get('phase')
@property
def sample_points(self):
"""The frequencies at which the compressed waveform is sampled.
This is
always returned as an array, even if the stored `sample_points` is an
hdf dataset. If `load_to_memory` is True and the stored points are
an hdf dataset, the `sample_points` will cached in memory the first
time this attribute is accessed.
Returns
-------
sample_points : Array
"""
return self._get('sample_points')
[docs]
def clear_cache(self):
"""Clear self's cache of amplitude, phase, and sample_points."""
self._cache.clear()
[docs]
def decompress(self, out=None, df=None, f_lower=None, interpolation=None):
"""Decompress self.
Parameters
----------
out : {None, FrequencySeries}
Write the decompressed waveform to the given frequency series. The
decompressed waveform will have the same `delta_f` as `out`.
Either this or `df` must be provided.
df : {None, float}
Decompress the waveform such that its `delta_f` has the given
value. Either this or `out` must be provided.
f_lower : {None, float}
The starting frequency at which to decompress the waveform. Cannot
be less than the minimum frequency in `sample_points`. If `None`
provided, will default to the minimum frequency in `sample_points`.
interpolation : {None, str}
The interpolation to use for decompressing the waveform. If `None`
provided, will default to `self.interpolation`.
Returns
-------
FrequencySeries
The decompressed waveform.
"""
if f_lower is None:
# use the minimum of the samlpe points
f_lower = self.sample_points.min()
if interpolation is None:
interpolation = self.interpolation
return fd_decompress(self.amplitude, self.phase, self.sample_points,
out=out, df=df, f_lower=f_lower,
interpolation=interpolation)
[docs]
def write_to_hdf(self, fp, template_hash, root=None, precision=None):
"""Write the compressed waveform to the given hdf file handler.
The waveform is written to:
`fp['[{root}/]compressed_waveforms/{template_hash}/{param}']`,
where `param` is the `sample_points`, `amplitude`, and `phase`. The
`interpolation`, `tolerance`, `mismatch` and `precision` are saved
to the group's attributes.
Parameters
----------
fp : h5py.File
An open hdf file to write the compressed waveform to.
template_hash : {hash, int, str}
A hash, int, or string to map the template to the waveform.
root : {None, str}
Put the `compressed_waveforms` group in the given directory in the
hdf file. If `None`, `compressed_waveforms` will be the root
directory.
precision : {None, str}
Cast the saved parameters to the given precision before saving. If
None provided, will use whatever their current precision is. This
will raise an error if the parameters have single precision but the
requested precision is double.
"""
if root is None:
root = ''
else:
root = '%s/'%(root)
if precision is None:
precision = self.precision
elif precision == 'double' and self.precision == 'single':
raise ValueError("cannot cast single precision to double")
outdtype = _real_dtypes[precision]
group = '%scompressed_waveforms/%s' %(root, str(template_hash))
for param in ['amplitude', 'phase', 'sample_points']:
fp['%s/%s' %(group, param)] = self._get(param).astype(outdtype)
fp_group = fp[group]
fp_group.attrs['mismatch'] = self.mismatch
fp_group.attrs['interpolation'] = self.interpolation
fp_group.attrs['tolerance'] = self.tolerance
fp_group.attrs['precision'] = precision
fp_group.attrs['compression_factor'] = self.compression_factor
[docs]
@classmethod
def from_hdf(cls, fp, template_hash, root=None, load_to_memory=True,
load_now=False):
"""Load a compressed waveform from the given hdf file handler.
The waveform is retrieved from:
`fp['[{root}/]compressed_waveforms/{template_hash}/{param}']`,
where `param` is the `sample_points`, `amplitude`, and `phase`.
Parameters
----------
fp : h5py.File
An open hdf file to write the compressed waveform to.
template_hash : {hash, int, str}
The id of the waveform.
root : {None, str}
Retrieve the `compressed_waveforms` group from the given string.
If `None`, `compressed_waveforms` will be assumed to be in the
top level.
load_to_memory : {True, bool}
Set the `load_to_memory` attribute to the given value in the
returned instance.
load_now : {False, bool}
Immediately load the `sample_points`/`amplitude`/`phase` to memory.
Returns
-------
CompressedWaveform
An instance of this class with parameters loaded from the hdf file.
"""
if root is None:
root = ''
else:
root = '%s/'%(root)
group = '%scompressed_waveforms/%s' %(root, str(template_hash))
fp_group = fp[group]
sample_points = fp_group['sample_points']
amp = fp_group['amplitude']
phase = fp_group['phase']
if load_now:
sample_points = sample_points[:]
amp = amp[:]
phase = phase[:]
return cls(sample_points, amp, phase,
interpolation=fp_group.attrs['interpolation'],
tolerance=fp_group.attrs['tolerance'],
mismatch=fp_group.attrs['mismatch'],
precision=fp_group.attrs['precision'],
load_to_memory=load_to_memory)