# Copyright (C) 2014 Tito Dal Canton, Josh Willis, Alex Nitz
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Provides a class representing a time series.
"""
import os as _os
import h5py
from pycbc.types.array import Array, _convert, complex_same_precision_as, zeros
from pycbc.types.array import _nocomplex
from pycbc.types.frequencyseries import FrequencySeries
import lal as _lal
import numpy as _numpy
from scipy.io.wavfile import write as write_wav
[docs]
class TimeSeries(Array):
"""Models a time series consisting of uniformly sampled scalar values.
Parameters
----------
initial_array : array-like
Array containing sampled data.
delta_t : float
Time between consecutive samples in seconds.
epoch : {None, lal.LIGOTimeGPS}, optional
Time of the first sample in seconds.
dtype : {None, data-type}, optional
Sample data type.
copy : boolean, optional
If True, samples are copied to a new array.
"""
def __init__(self, initial_array, delta_t=None,
epoch=None, dtype=None, copy=True):
if len(initial_array) < 1:
raise ValueError('initial_array must contain at least one sample.')
if delta_t is None:
try:
delta_t = initial_array.delta_t
except AttributeError:
raise TypeError('must provide either an initial_array with a delta_t attribute, or a value for delta_t')
if not delta_t > 0:
raise ValueError('delta_t must be a positive number')
# Get epoch from initial_array if epoch not given (or is None)
# If initialy array has no epoch, set epoch to 0.
# If epoch is provided, use that.
if not isinstance(epoch, _lal.LIGOTimeGPS):
if epoch is None:
if isinstance(initial_array, TimeSeries):
epoch = initial_array._epoch
else:
epoch = _lal.LIGOTimeGPS(0)
elif epoch is not None:
try:
epoch = _lal.LIGOTimeGPS(epoch)
except:
raise TypeError('epoch must be either None or a lal.LIGOTimeGPS')
Array.__init__(self, initial_array, dtype=dtype, copy=copy)
self._delta_t = delta_t
self._epoch = epoch
[docs]
def to_astropy(self, name='pycbc'):
""" Return an astropy.timeseries.TimeSeries instance
"""
from astropy.timeseries import TimeSeries as ATimeSeries
from astropy.time import Time
from astropy.units import s
start = Time(float(self.start_time), format='gps', scale='utc')
delta = self.delta_t * s
return ATimeSeries({name: self.numpy()},
time_start=start,
time_delta=delta,
n_samples=len(self))
[docs]
def epoch_close(self, other):
""" Check if the epoch is close enough to allow operations """
dt = abs(float(self.start_time - other.start_time))
return dt <= 1e-7
[docs]
def sample_rate_close(self, other):
""" Check if the sample rate is close enough to allow operations """
# compare our delta_t either to a another time series' or
# to a given sample rate (float)
if isinstance(other, TimeSeries):
odelta_t = other.delta_t
else:
odelta_t = 1.0/other
if (odelta_t - self.delta_t) / self.delta_t > 1e-4:
return False
if abs(1 - odelta_t / self.delta_t) * len(self) > 0.5:
return False
return True
def _return(self, ary):
return TimeSeries(ary, self._delta_t, epoch=self._epoch, copy=False)
def _typecheck(self, other):
if isinstance(other, TimeSeries):
if not self.sample_rate_close(other):
raise ValueError('different delta_t, {} vs {}'.format(
self.delta_t, other.delta_t))
if not self.epoch_close(other):
raise ValueError('different epoch, {} vs {}'.format(
self.start_time, other.start_time))
def _getslice(self, index):
# Set the new epoch---note that index.start may also be None
if index.start is None:
new_epoch = self._epoch
else:
if index.start < 0:
raise ValueError(('Negative start index ({})'
' not supported').format(index.start))
new_epoch = self._epoch + index.start * self._delta_t
if index.step is not None:
new_delta_t = self._delta_t * index.step
else:
new_delta_t = self._delta_t
return TimeSeries(Array._getslice(self, index), new_delta_t,
new_epoch, copy=False)
[docs]
def prepend_zeros(self, num):
"""Prepend num zeros onto the beginning of this TimeSeries. Update also
epoch to include this prepending.
"""
self.resize(len(self) + num)
self.roll(num)
self._epoch = self._epoch - num * self._delta_t
[docs]
def append_zeros(self, num):
"""Append num zeros onto the end of this TimeSeries.
"""
self.resize(len(self) + num)
[docs]
def get_delta_t(self):
"""Return time between consecutive samples in seconds.
"""
return self._delta_t
delta_t = property(get_delta_t,
doc="Time between consecutive samples in seconds.")
[docs]
def get_duration(self):
"""Return duration of time series in seconds.
"""
return len(self) * self._delta_t
duration = property(get_duration,
doc="Duration of time series in seconds.")
[docs]
def get_sample_rate(self):
"""Return the sample rate of the time series.
"""
return 1.0/self.delta_t
sample_rate = property(get_sample_rate,
doc="The sample rate of the time series.")
[docs]
def time_slice(self, start, end, mode='floor'):
"""Return the slice of the time series that contains the time range
in GPS seconds.
"""
if start < self.start_time:
raise ValueError('Time series does not contain a time as early as %s' % start)
if end > self.end_time:
raise ValueError('Time series does not contain a time as late as %s' % end)
start_idx = float(start - self.start_time) * self.sample_rate
end_idx = float(end - self.start_time) * self.sample_rate
if _numpy.isclose(start_idx, round(start_idx), rtol=0, atol=1E-3):
start_idx = round(start_idx)
if _numpy.isclose(end_idx, round(end_idx), rtol=0, atol=1E-3):
end_idx = round(end_idx)
if mode == 'floor':
start_idx = int(start_idx)
end_idx = int(end_idx)
elif mode == 'nearest':
start_idx = int(round(start_idx))
end_idx = int(round(end_idx))
else:
raise ValueError("Invalid mode: {}".format(mode))
return self[start_idx:end_idx]
@property
def delta_f(self):
"""Return the delta_f this ts would have in the frequency domain
"""
return 1.0 / self.duration
@property
def start_time(self):
"""Return time series start time as a LIGOTimeGPS.
"""
return self._epoch
@start_time.setter
def start_time(self, time):
""" Set the start time
"""
self._epoch = _lal.LIGOTimeGPS(time)
[docs]
def get_end_time(self):
"""Return time series end time as a LIGOTimeGPS.
"""
return self._epoch + self.get_duration()
end_time = property(get_end_time,
doc="Time series end time as a LIGOTimeGPS.")
[docs]
def get_sample_times(self):
"""Return an Array containing the sample times.
"""
if self._epoch is None:
return Array(range(len(self))) * self._delta_t
else:
return Array(range(len(self))) * self._delta_t + float(self._epoch)
sample_times = property(get_sample_times,
doc="Array containing the sample times.")
[docs]
def at_time(self, time, nearest_sample=False,
interpolate=None, extrapolate=None):
"""Return the value of the TimeSeries at the specified GPS time.
Parameters
----------
time: scalar or array-like
GPS time at which the value is wanted. Note that LIGOTimeGPS
objects count as scalar.
nearest_sample: bool
Return the sample at the time nearest to the chosen time rather
than rounded down.
interpolate: str, None
Return the interpolated value of the time series. Choices
are simple linear or quadratic interpolation.
extrapolate: str or float, None
Value to return if time is outside the range of the vector or
method of extrapolating the value.
"""
if nearest_sample:
time = time + self.delta_t / 2.0
vtime = _numpy.array(time, ndmin=1)
fill_value = None
keep_idx = None
size = len(vtime)
if extrapolate is not None:
if _numpy.isscalar(extrapolate) and _numpy.isreal(extrapolate):
fill_value = extrapolate
facl = facr = 0
if interpolate == 'quadratic':
facl = facr = 1.1
elif interpolate == 'linear':
facl, facr = 0.1, 1.1
left = (vtime >= self.start_time + self.delta_t * facl)
right = (vtime < self.end_time - self.delta_t * facr)
keep_idx = _numpy.where(left & right)[0]
vtime = vtime[keep_idx]
else:
raise ValueError(f"Unsupported extrapolate: {extrapolate}")
fi = (vtime - float(self.start_time)) * self.sample_rate
i = _numpy.asarray(_numpy.floor(fi)).astype(int)
di = fi - i
if interpolate == 'linear':
a = self[i]
b = self[i+1]
ans = a + (b - a) * di
elif interpolate == 'quadratic':
c = self.data[i]
xr = self.data[i + 1] - c
xl = self.data[i - 1] - c
a = 0.5 * (xr + xl)
b = 0.5 * (xr - xl)
ans = a * di**2.0 + b * di + c
else:
ans = self[i]
ans = _numpy.array(ans, ndmin=1)
if fill_value is not None:
old = ans
ans = _numpy.zeros(size) + fill_value
ans[keep_idx] = old
ans = _numpy.array(ans, ndmin=1)
if _numpy.ndim(time) == 0:
return ans[0]
return ans
at_times = at_time
def __eq__(self,other):
"""
This is the Python special method invoked whenever the '=='
comparison is used. It will return true if the data of two
time series are identical, and all of the numeric meta-data
are identical, irrespective of whether or not the two
instances live in the same memory (for that comparison, the
Python statement 'a is b' should be used instead).
Thus, this method returns 'True' if the types of both 'self'
and 'other' are identical, as well as their lengths, dtypes,
epochs, delta_ts and the data in the arrays, element by element.
It will always do the comparison on the CPU, but will *not* move
either object to the CPU if it is not already there, nor change
the scheme of either object. It is possible to compare a CPU
object to a GPU object, and the comparison should be true if the
data and meta-data of the two objects are the same.
Note in particular that this function returns a single boolean,
and not an array of booleans as Numpy does. If the numpy
behavior is instead desired it can be obtained using the numpy()
method of the PyCBC type to get a numpy instance from each
object, and invoking '==' on those two instances.
Parameters
----------
other: another Python object, that should be tested for equality
with 'self'.
Returns
-------
boolean: 'True' if the types, dtypes, lengths, epochs, delta_ts
and data of the two objects are each identical.
"""
if super(TimeSeries,self).__eq__(other):
return (self._epoch == other._epoch and self._delta_t == other._delta_t)
else:
return False
[docs]
def almost_equal_elem(self,other,tol,relative=True,dtol=0.0):
"""
Compare whether two time series are almost equal, element
by element.
If the 'relative' parameter is 'True' (the default) then the
'tol' parameter (which must be positive) is interpreted as a
relative tolerance, and the comparison returns 'True' only if
abs(self[i]-other[i]) <= tol*abs(self[i])
for all elements of the series.
If 'relative' is 'False', then 'tol' is an absolute tolerance,
and the comparison is true only if
abs(self[i]-other[i]) <= tol
for all elements of the series.
The method also checks that self.delta_t is within 'dtol' of
other.delta_t; if 'dtol' has its default value of 0 then exact
equality between the two is required.
Other meta-data (type, dtype, length, and epoch) must be exactly
equal. If either object's memory lives on the GPU it will be
copied to the CPU for the comparison, which may be slow. But the
original object itself will not have its memory relocated nor
scheme changed.
Parameters
----------
other: another Python object, that should be tested for
almost-equality with 'self', element-by-element.
tol: a non-negative number, the tolerance, which is interpreted
as either a relative tolerance (the default) or an absolute
tolerance.
relative: A boolean, indicating whether 'tol' should be interpreted
as a relative tolerance (if True, the default if this argument
is omitted) or as an absolute tolerance (if tol is False).
dtol: a non-negative number, the tolerance for delta_t. Like 'tol',
it is interpreted as relative or absolute based on the value of
'relative'. This parameter defaults to zero, enforcing exact
equality between the delta_t values of the two TimeSeries.
Returns
-------
boolean: 'True' if the data and delta_ts agree within the tolerance,
as interpreted by the 'relative' keyword, and if the types,
lengths, dtypes, and epochs are exactly the same.
"""
# Check that the delta_t tolerance is non-negative; raise an exception
# if needed.
if (dtol < 0.0):
raise ValueError("Tolerance in delta_t cannot be negative")
if super(TimeSeries,self).almost_equal_elem(other,tol=tol,relative=relative):
if relative:
return (self._epoch == other._epoch and
abs(self._delta_t-other._delta_t) <= dtol*self._delta_t)
else:
return (self._epoch == other._epoch and
abs(self._delta_t-other._delta_t) <= dtol)
else:
return False
[docs]
def almost_equal_norm(self,other,tol,relative=True,dtol=0.0):
"""
Compare whether two time series are almost equal, normwise.
If the 'relative' parameter is 'True' (the default) then the
'tol' parameter (which must be positive) is interpreted as a
relative tolerance, and the comparison returns 'True' only if
abs(norm(self-other)) <= tol*abs(norm(self)).
If 'relative' is 'False', then 'tol' is an absolute tolerance,
and the comparison is true only if
abs(norm(self-other)) <= tol
The method also checks that self.delta_t is within 'dtol' of
other.delta_t; if 'dtol' has its default value of 0 then exact
equality between the two is required.
Other meta-data (type, dtype, length, and epoch) must be exactly
equal. If either object's memory lives on the GPU it will be
copied to the CPU for the comparison, which may be slow. But the
original object itself will not have its memory relocated nor
scheme changed.
Parameters
----------
other: another Python object, that should be tested for
almost-equality with 'self', based on their norms.
tol: a non-negative number, the tolerance, which is interpreted
as either a relative tolerance (the default) or an absolute
tolerance.
relative: A boolean, indicating whether 'tol' should be interpreted
as a relative tolerance (if True, the default if this argument
is omitted) or as an absolute tolerance (if tol is False).
dtol: a non-negative number, the tolerance for delta_t. Like 'tol',
it is interpreted as relative or absolute based on the value of
'relative'. This parameter defaults to zero, enforcing exact
equality between the delta_t values of the two TimeSeries.
Returns
-------
boolean: 'True' if the data and delta_ts agree within the tolerance,
as interpreted by the 'relative' keyword, and if the types,
lengths, dtypes, and epochs are exactly the same.
"""
# Check that the delta_t tolerance is non-negative; raise an exception
# if needed.
if (dtol < 0.0):
raise ValueError("Tolerance in delta_t cannot be negative")
if super(TimeSeries,self).almost_equal_norm(other,tol=tol,relative=relative):
if relative:
return (self._epoch == other._epoch and
abs(self._delta_t-other._delta_t) <= dtol*self._delta_t)
else:
return (self._epoch == other._epoch and
abs(self._delta_t-other._delta_t) <= dtol)
else:
return False
[docs]
@_convert
def lal(self):
"""Produces a LAL time series object equivalent to self.
Returns
-------
lal_data : {lal.*TimeSeries}
LAL time series object containing the same data as self.
The actual type depends on the sample's dtype. If the epoch of
self is 'None', the epoch of the returned LAL object will be
LIGOTimeGPS(0,0); otherwise, the same as that of self.
Raises
------
TypeError
If time series is stored in GPU memory.
"""
lal_data = None
ep = self._epoch
if self._data.dtype == _numpy.float32:
lal_data = _lal.CreateREAL4TimeSeries("",ep,0,self.delta_t,_lal.SecondUnit,len(self))
elif self._data.dtype == _numpy.float64:
lal_data = _lal.CreateREAL8TimeSeries("",ep,0,self.delta_t,_lal.SecondUnit,len(self))
elif self._data.dtype == _numpy.complex64:
lal_data = _lal.CreateCOMPLEX8TimeSeries("",ep,0,self.delta_t,_lal.SecondUnit,len(self))
elif self._data.dtype == _numpy.complex128:
lal_data = _lal.CreateCOMPLEX16TimeSeries("",ep,0,self.delta_t,_lal.SecondUnit,len(self))
lal_data.data.data[:] = self.numpy()
return lal_data
[docs]
def crop(self, left, right):
""" Remove given seconds from either end of time series
Parameters
----------
left : float
Number of seconds of data to remove from the left of the time series.
right : float
Number of seconds of data to remove from the right of the time series.
Returns
-------
cropped : pycbc.types.TimeSeries
The reduced time series
"""
if left + right > self.duration:
raise ValueError('Cannot crop more data than we have')
s = int(left * self.sample_rate)
e = len(self) - int(right * self.sample_rate)
return self[s:e]
[docs]
def save_to_wav(self, file_name):
""" Save this time series to a wav format audio file.
Parameters
----------
file_name : string
The output file name
"""
scaled = _numpy.int16(self.numpy()/max(abs(self)) * 32767)
write_wav(file_name, int(self.sample_rate), scaled)
[docs]
def psd(self, segment_duration, **kwds):
""" Calculate the power spectral density of this time series.
Use the `pycbc.psd.welch` method to estimate the psd of this time segment.
For more complete options, please see that function.
Parameters
----------
segment_duration: float
Duration in seconds to use for each sample of the spectrum.
kwds : keywords
Additional keyword arguments are passed on to the `pycbc.psd.welch` method.
Returns
-------
psd : FrequencySeries
Frequency series containing the estimated PSD.
"""
from pycbc.psd import welch
seg_len = int(round(segment_duration * self.sample_rate))
seg_stride = int(seg_len / 2)
return welch(self, seg_len=seg_len,
seg_stride=seg_stride,
**kwds)
[docs]
def gate(self, time, window=0.25, method='taper', copy=True,
taper_width=0.25, invpsd=None):
""" Gate out portion of time series
Parameters
----------
time: float
Central time of the gate in seconds
window: float
Half-length in seconds to remove data around gate time.
method: str
Method to apply gate, options are 'hard', 'taper', and 'paint'.
copy: bool
If False, do operations inplace to this time series, else return
new time series.
taper_width: float
Length of tapering region on either side of excized data. Only
applies to the taper gating method.
invpsd: pycbc.types.FrequencySeries
The inverse PSD to use for painting method. If not given,
a PSD is generated using default settings.
Returns
-------
data: pycbc.types.TimeSeris
Gated time series
"""
data = self.copy() if copy else self
if method == 'taper':
from pycbc.strain import gate_data
return gate_data(data, [(time, window, taper_width)])
elif method == 'paint':
# Uses the hole-filling method of
# https://arxiv.org/pdf/1908.05644.pdf
from pycbc.strain.gate import gate_and_paint
from pycbc.waveform.utils import apply_fd_time_shift
if invpsd is None:
# These are some bare minimum settings, normally you
# should probably provide a psd
invpsd = 1. / self.filter_psd(self.duration/32, self.delta_f, 0)
lindex = int((time - window - self.start_time) / self.delta_t)
rindex = int((time + window - self.start_time) / self.delta_t)
lindex = lindex if lindex >= 0 else 0
rindex = rindex if rindex <= len(self) else len(self)
rindex_time = float(self.start_time + rindex * self.delta_t)
offset = rindex_time - (time + window)
if offset == 0:
return gate_and_paint(data, lindex, rindex, invpsd, copy=False)
else:
# time shift such that gate end time lands on a specific data sample
fdata = data.to_frequencyseries()
fdata = apply_fd_time_shift(fdata, offset + fdata.epoch, copy=False)
# gate and paint in time domain
data = fdata.to_timeseries()
data = gate_and_paint(data, lindex, rindex, invpsd, copy=False)
# shift back to the original time
fdata = data.to_frequencyseries()
fdata = apply_fd_time_shift(fdata, -offset + fdata.epoch, copy=False)
tdata = fdata.to_timeseries()
return tdata
elif method == 'hard':
tslice = data.time_slice(time - window, time + window)
tslice[:] = 0
return data
else:
raise ValueError('Invalid method name: {}'.format(method))
[docs]
def filter_psd(self, segment_duration, delta_f, flow):
""" Calculate the power spectral density of this time series.
Use the `pycbc.psd.welch` method to estimate the psd of this time segment.
The psd is then truncated in the time domain to the segment duration
and interpolated to the requested sample frequency.
Parameters
----------
segment_duration: float
Duration in seconds to use for each sample of the spectrum.
delta_f : float
Frequency spacing to return psd at.
flow : float
The low frequency cutoff to apply when truncating the inverse
spectrum.
Returns
-------
psd : FrequencySeries
Frequency series containing the estimated PSD.
"""
from pycbc.psd import interpolate, inverse_spectrum_truncation
p = self.psd(segment_duration)
samples = int(round(p.sample_rate * segment_duration))
p = interpolate(p, delta_f)
return inverse_spectrum_truncation(p, samples,
low_frequency_cutoff=flow,
trunc_method='hann')
[docs]
def whiten(self, segment_duration, max_filter_duration, trunc_method='hann',
remove_corrupted=True, low_frequency_cutoff=None,
return_psd=False, **kwds):
""" Return a whitened time series
Parameters
----------
segment_duration: float
Duration in seconds to use for each sample of the spectrum.
max_filter_duration : int
Maximum length of the time-domain filter in seconds.
trunc_method : {None, 'hann'}
Function used for truncating the time-domain filter.
None produces a hard truncation at `max_filter_len`.
remove_corrupted : {True, boolean}
If True, the region of the time series corrupted by the whitening
is excised before returning. If false, the corrupted regions
are not excised and the full time series is returned.
low_frequency_cutoff : {None, float}
Low frequency cutoff to pass to the inverse spectrum truncation.
This should be matched to a known low frequency cutoff of the
data if there is one.
return_psd : {False, Boolean}
Return the estimated and conditioned PSD that was used to whiten
the data.
kwds : keywords
Additional keyword arguments are passed on to the `pycbc.psd.welch` method.
Returns
-------
whitened_data : TimeSeries
The whitened time series
"""
from pycbc.psd import inverse_spectrum_truncation, interpolate
# Estimate the noise spectrum
psd = self.psd(segment_duration, **kwds)
psd = interpolate(psd, self.delta_f)
max_filter_len = int(round(max_filter_duration * self.sample_rate))
# Interpolate and smooth to the desired corruption length
psd = inverse_spectrum_truncation(psd,
max_filter_len=max_filter_len,
low_frequency_cutoff=low_frequency_cutoff,
trunc_method=trunc_method)
# Whiten the data by the asd
white = (self.to_frequencyseries() / psd**0.5).to_timeseries()
if remove_corrupted:
white = white[int(max_filter_len/2):int(len(self)-max_filter_len/2)]
if return_psd:
return white, psd
return white
[docs]
def notch_fir(self, f1, f2, order, beta=5.0, remove_corrupted=True):
""" notch filter the time series using an FIR filtered generated from
the ideal response passed through a time-domain kaiser
window (beta = 5.0)
The suppression of the notch filter is related to the bandwidth and
the number of samples in the filter length. For a few Hz bandwidth,
a length corresponding to a few seconds is typically
required to create significant suppression in the notched band.
Parameters
----------
Time Series: TimeSeries
The time series to be notched.
f1: float
The start of the frequency suppression.
f2: float
The end of the frequency suppression.
order: int
Number of corrupted samples on each side of the time series
beta: float
Beta parameter of the kaiser window that sets the side lobe attenuation.
"""
from pycbc.filter import notch_fir
ts = notch_fir(self, f1, f2, order, beta=beta)
if remove_corrupted:
ts = ts[order:len(ts)-order]
return ts
[docs]
def lowpass_fir(self, frequency, order, beta=5.0, remove_corrupted=True):
""" Lowpass filter the time series using an FIR filtered generated from
the ideal response passed through a kaiser window (beta = 5.0)
Parameters
----------
Time Series: TimeSeries
The time series to be low-passed.
frequency: float
The frequency below which is suppressed.
order: int
Number of corrupted samples on each side of the time series
beta: float
Beta parameter of the kaiser window that sets the side lobe attenuation.
remove_corrupted : {True, boolean}
If True, the region of the time series corrupted by the filtering
is excised before returning. If false, the corrupted regions
are not excised and the full time series is returned.
"""
from pycbc.filter import lowpass_fir
ts = lowpass_fir(self, frequency, order, beta=beta)
if remove_corrupted:
ts = ts[order:len(ts)-order]
return ts
[docs]
def highpass_fir(self, frequency, order, beta=5.0, remove_corrupted=True):
""" Highpass filter the time series using an FIR filtered generated from
the ideal response passed through a kaiser window (beta = 5.0)
Parameters
----------
Time Series: TimeSeries
The time series to be high-passed.
frequency: float
The frequency below which is suppressed.
order: int
Number of corrupted samples on each side of the time series
beta: float
Beta parameter of the kaiser window that sets the side lobe attenuation.
remove_corrupted : {True, boolean}
If True, the region of the time series corrupted by the filtering
is excised before returning. If false, the corrupted regions
are not excised and the full time series is returned.
"""
from pycbc.filter import highpass_fir
ts = highpass_fir(self, frequency, order, beta=beta)
if remove_corrupted:
ts = ts[order:len(ts)-order]
return ts
[docs]
def fir_zero_filter(self, coeff):
"""Filter the timeseries with a set of FIR coefficients
Parameters
----------
coeff: numpy.ndarray
FIR coefficients. Should be and odd length and symmetric.
Returns
-------
filtered_series: pycbc.types.TimeSeries
Return the filtered timeseries, which has been properly shifted to account
for the FIR filter delay and the corrupted regions zeroed out.
"""
from pycbc.filter import fir_zero_filter
return self._return(fir_zero_filter(coeff, self))
[docs]
def resample(self, delta_t):
""" Resample this time series to the new delta_t
Parameters
-----------
delta_t: float
The time step to resample the times series to.
Returns
-------
resampled_ts: pycbc.types.TimeSeries
The resample timeseries at the new time interval delta_t.
"""
from pycbc.filter import resample_to_delta_t
return resample_to_delta_t(self, delta_t)
[docs]
def save(self, path, group = None):
"""
Save time series to a Numpy .npy, hdf, or text file. The first column
contains the sample times, the second contains the values.
In the case of a complex time series saved as text, the imaginary
part is written as a third column. When using hdf format, the data is stored
as a single vector, along with relevant attributes.
Parameters
----------
path: string
Destination file path. Must end with either .hdf, .npy or .txt.
group: string
Additional name for internal storage use. Ex. hdf storage uses
this as the key value.
Raises
------
ValueError
If path does not end in .npy or .txt.
"""
ext = _os.path.splitext(path)[1]
if ext == '.npy':
output = _numpy.vstack((self.sample_times.numpy(), self.numpy())).T
_numpy.save(path, output)
elif ext == '.txt':
if self.kind == 'real':
output = _numpy.vstack((self.sample_times.numpy(),
self.numpy())).T
elif self.kind == 'complex':
output = _numpy.vstack((self.sample_times.numpy(),
self.numpy().real,
self.numpy().imag)).T
_numpy.savetxt(path, output)
elif ext =='.hdf':
key = 'data' if group is None else group
with h5py.File(path, 'a') as f:
ds = f.create_dataset(key, data=self.numpy(),
compression='gzip',
compression_opts=9, shuffle=True)
ds.attrs['start_time'] = float(self.start_time)
ds.attrs['delta_t'] = float(self.delta_t)
else:
raise ValueError('Path must end with .npy, .txt or .hdf')
[docs]
def to_timeseries(self):
""" Return time series"""
return self
[docs]
@_nocomplex
def to_frequencyseries(self, delta_f=None):
""" Return the Fourier transform of this time series
Parameters
----------
delta_f : {None, float}, optional
The frequency resolution of the returned frequency series. By
default the resolution is determined by the duration of the timeseries.
Returns
-------
FrequencySeries:
The fourier transform of this time series.
"""
from pycbc.fft import fft
if not delta_f:
delta_f = 1.0 / self.duration
# add 0.5 to round integer
tlen = int(1.0 / delta_f / self.delta_t + 0.5)
flen = int(tlen / 2 + 1)
if tlen < len(self):
raise ValueError("The value of delta_f (%s) would be "
"undersampled. Maximum delta_f "
"is %s." % (delta_f, 1.0 / self.duration))
if not delta_f:
tmp = self
else:
tmp = TimeSeries(zeros(tlen, dtype=self.dtype),
delta_t=self.delta_t, epoch=self.start_time)
tmp[:len(self)] = self[:]
f = FrequencySeries(zeros(flen,
dtype=complex_same_precision_as(self)),
delta_f=delta_f)
fft(tmp, f)
f._delta_f = delta_f
return f
[docs]
def inject(self, other, copy=True):
"""Return copy of self with other injected into it.
The other vector will be resized and time shifted with sub-sample
precision before adding. This assumes that one can assume zeros
outside of the original vector range.
"""
# only handle equal sample rate for now.
if not self.sample_rate_close(other):
raise ValueError('Sample rate must be the same')
# determine if we want to inject in place or not
if copy:
ts = self.copy()
else:
ts = self
# Other is disjoint
if ((other.start_time >= ts.end_time) or
(ts.start_time > other.end_time)):
return ts
other = other.copy()
dt = float((other.start_time - ts.start_time) * ts.sample_rate)
# This coaligns other to the time stepping of self
if not dt.is_integer():
diff = (dt - _numpy.floor(dt)) * ts.delta_t
# insert zeros at end
other.resize(len(other) + (len(other) + 1) % 2 + 1)
# fd shift to the right
other = other.cyclic_time_shift(diff)
# get indices of other with respect to self
# this is already an integer to floating point precission
left = float(other.start_time - ts.start_time) * ts.sample_rate
left = int(round(left))
right = left + len(other)
oleft = 0
oright = len(other)
# other overhangs on left so truncate
if left < 0:
oleft = -left
left = 0
# other overhangs on right so truncate
if right > len(ts):
oright = len(other) - (right - len(ts))
right = len(ts)
ts[left:right] += other[oleft:oright]
return ts
add_into = inject # maintain backwards compatibility for now
[docs]
@_nocomplex
def cyclic_time_shift(self, dt):
"""Shift the data and timestamps by a given number of seconds
Shift the data and timestamps in the time domain a given number of
seconds. To just change the time stamps, do ts.start_time += dt.
The time shift may be smaller than the intrinsic sample rate of the data.
Note that data will be cyclically rotated, so if you shift by 2
seconds, the final 2 seconds of your data will now be at the
beginning of the data set.
Parameters
----------
dt : float
Amount of time to shift the vector.
Returns
-------
data : pycbc.types.TimeSeries
The time shifted time series.
"""
# We do this in the frequency domain to allow us to do sub-sample
# time shifts. This also results in the shift being circular. It
# is left to a future update to do a faster impelementation in the case
# where the time shift can be done with an exact number of samples.
return self.to_frequencyseries().cyclic_time_shift(dt).to_timeseries()
[docs]
def match(self, other, psd=None,
low_frequency_cutoff=None, high_frequency_cutoff=None):
""" Return the match between the two TimeSeries or FrequencySeries.
Return the match between two waveforms. This is equivalent to the overlap
maximized over time and phase. By default, the other vector will be
resized to match self. This may remove high frequency content or the
end of the vector.
Parameters
----------
other : TimeSeries or FrequencySeries
The input vector containing a waveform.
psd : Frequency Series
A power spectral density to weight the overlap.
low_frequency_cutoff : {None, float}, optional
The frequency to begin the match.
high_frequency_cutoff : {None, float}, optional
The frequency to stop the match.
Returns
-------
match: float
index: int
The number of samples to shift to get the match.
"""
return self.to_frequencyseries().match(other, psd=psd,
low_frequency_cutoff=low_frequency_cutoff,
high_frequency_cutoff=high_frequency_cutoff)
[docs]
def detrend(self, type='linear'):
""" Remove linear trend from the data
Remove a linear trend from the data to improve the approximation that
the data is circularly convolved, this helps reduce the size of filter
transients from a circular convolution / filter.
Parameters
----------
type: str
The choice of detrending. The default ('linear') removes a linear
least squares fit. 'constant' removes only the mean of the data.
"""
from scipy.signal import detrend
return self._return(detrend(self.numpy(), type=type))
[docs]
def plot(self, **kwds):
""" Basic plot of this time series
"""
from matplotlib import pyplot
if self.kind == 'real':
plot = pyplot.plot(self.sample_times, self, **kwds)
return plot
elif self.kind == 'complex':
plot1 = pyplot.plot(self.sample_times, self.real(), **kwds)
plot2 = pyplot.plot(self.sample_times, self.imag(), **kwds)
return plot1, plot2
[docs]
def load_timeseries(path, group=None):
"""Load a TimeSeries from an HDF5, ASCII or Numpy file. The file type is
inferred from the file extension, which must be `.hdf`, `.txt` or `.npy`.
For ASCII and Numpy files, the first column of the array is assumed to
contain the sample times. If the array has two columns, a real-valued time
series is returned. If the array has three columns, the second and third
ones are assumed to contain the real and imaginary parts of a complex time
series.
For HDF files, the dataset is assumed to contain the attributes `delta_t`
and `start_time`, which should contain respectively the sampling period in
seconds and the start GPS time of the data.
The default data types will be double precision floating point.
Parameters
----------
path: string
Input file path. Must end with either `.npy`, `.txt` or `.hdf`.
group: string
Additional name for internal storage use. When reading HDF files, this
is the path to the HDF dataset to read.
Raises
------
ValueError
If path does not end in a supported extension.
For Numpy and ASCII input files, this is also raised if the array
does not have 2 or 3 dimensions.
"""
ext = _os.path.splitext(path)[1]
if ext == '.npy':
data = _numpy.load(path)
elif ext == '.txt':
data = _numpy.loadtxt(path)
elif ext == '.hdf':
key = 'data' if group is None else group
with h5py.File(path, 'r') as f:
data = f[key][:]
series = TimeSeries(data, delta_t=f[key].attrs['delta_t'],
epoch=f[key].attrs['start_time'])
return series
else:
raise ValueError('Path must end with .npy, .hdf, or .txt')
delta_t = (data[-1][0] - data[0][0]) / (len(data) - 1)
epoch = _lal.LIGOTimeGPS(data[0][0])
if data.ndim == 2:
return TimeSeries(data[:,1], delta_t=delta_t, epoch=epoch)
elif data.ndim == 3:
return TimeSeries(data[:,1] + 1j*data[:,2],
delta_t=delta_t, epoch=epoch)
raise ValueError('File has %s dimensions, cannot convert to TimeSeries, \
must be 2 (real) or 3 (complex)' % data.ndim)