# Copyright (C) 2019 Miriam Cabero
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
""" Functions for removing frequency lines from real data.
"""
import numpy
from pycbc.types import TimeSeries, zeros
[docs]
def avg_inner_product(data1, data2, bin_size):
""" Calculate the time-domain inner product averaged over bins.
Parameters
----------
data1: pycbc.types.TimeSeries
First data set.
data2: pycbc.types.TimeSeries
Second data set, with same duration and sample rate as data1.
bin_size: float
Duration of the bins the data will be divided into to calculate
the inner product.
Returns
-------
inner_prod: list
The (complex) inner product of data1 and data2 obtained in each bin.
amp: float
The absolute value of the median of the inner product.
phi: float
The angle of the median of the inner product.
"""
assert data1.duration == data2.duration
assert data1.sample_rate == data2.sample_rate
seglen = int(bin_size * data1.sample_rate)
inner_prod = []
for idx in range(int(data1.duration / bin_size)):
start, end = idx * seglen, (idx+1) * seglen
norm = len(data1[start:end])
bin_prod = 2 * sum(data1.data[start:end].real *
numpy.conjugate(data2.data[start:end])) / norm
inner_prod.append(bin_prod)
# Get the median over all bins to avoid outliers due to the presence
# of a signal in a particular bin.
inner_median = complex_median(inner_prod)
return inner_prod, numpy.abs(inner_median), numpy.angle(inner_median)
[docs]
def line_model(freq, data, tref, amp=1, phi=0):
""" Simple time-domain model for a frequency line.
Parameters
----------
freq: float
Frequency of the line.
data: pycbc.types.TimeSeries
Reference data, to get delta_t, start_time, duration and sample_times.
tref: float
Reference time for the line model.
amp: {1., float}, optional
Amplitude of the frequency line.
phi: {0. float}, optional
Phase of the frequency line (radians).
Returns
-------
freq_line: pycbc.types.TimeSeries
A timeseries of the line model with frequency 'freq'. The returned
data are complex to allow measuring the amplitude and phase of the
corresponding frequency line in the strain data. For extraction, use
only the real part of the data.
"""
freq_line = TimeSeries(zeros(len(data)), delta_t=data.delta_t,
epoch=data.start_time)
times = data.sample_times - float(tref)
alpha = 2 * numpy.pi * freq * times + phi
freq_line.data = amp * numpy.exp(1.j * alpha)
return freq_line
[docs]
def matching_line(freq, data, tref, bin_size=1):
""" Find the parameter of the line with frequency 'freq' in the data.
Parameters
----------
freq: float
Frequency of the line to find in the data.
data: pycbc.types.TimeSeries
Data from which the line wants to be measured.
tref: float
Reference time for the frequency line.
bin_size: {1, float}, optional
Duration of the bins the data will be divided into for averaging.
Returns
-------
line_model: pycbc.types.TimeSeries
A timeseries containing the frequency line with the amplitude
and phase measured from the data.
"""
template_line = line_model(freq, data, tref=tref)
# Measure amplitude and phase of the line in the data
_, amp, phi = avg_inner_product(data, template_line,
bin_size=bin_size)
return line_model(freq, data, tref=tref, amp=amp, phi=phi)
[docs]
def calibration_lines(freqs, data, tref=None):
""" Extract the calibration lines from strain data.
Parameters
----------
freqs: list
List containing the frequencies of the calibration lines.
data: pycbc.types.TimeSeries
Strain data to extract the calibration lines from.
tref: {None, float}, optional
Reference time for the line. If None, will use data.start_time.
Returns
-------
data: pycbc.types.TimeSeries
The strain data with the calibration lines removed.
"""
if tref is None:
tref = float(data.start_time)
for freq in freqs:
measured_line = matching_line(freq, data, tref,
bin_size=data.duration)
data -= measured_line.data.real
return data
[docs]
def clean_data(freqs, data, chunk, avg_bin):
""" Extract time-varying (wandering) lines from strain data.
Parameters
----------
freqs: list
List containing the frequencies of the wandering lines.
data: pycbc.types.TimeSeries
Strain data to extract the wandering lines from.
chunk: float
Duration of the chunks the data will be divided into to account
for the time variation of the wandering lines. Should be smaller
than data.duration, and allow for at least a few chunks.
avg_bin: float
Duration of the bins each chunk will be divided into for averaging
the inner product when measuring the parameters of the line. Should
be smaller than chunk.
Returns
-------
data: pycbc.types.TimeSeries
The strain data with the wandering lines removed.
"""
if avg_bin >= chunk:
raise ValueError('The bin size for averaging the inner product '
'must be less than the chunk size.')
if chunk >= data.duration:
raise ValueError('The chunk size must be less than the '
'data duration.')
steps = numpy.arange(0, int(data.duration/chunk)-0.5, 0.5)
seglen = chunk * data.sample_rate
tref = float(data.start_time)
for freq in freqs:
for step in steps:
start, end = int(step*seglen), int((step+1)*seglen)
chunk_line = matching_line(freq, data[start:end],
tref, bin_size=avg_bin)
# Apply hann window on sides of chunk_line to smooth boundaries
# and avoid discontinuities
hann_window = numpy.hanning(len(chunk_line))
apply_hann = TimeSeries(numpy.ones(len(chunk_line)),
delta_t=chunk_line.delta_t,
epoch=chunk_line.start_time)
if step == 0:
apply_hann.data[len(hann_window)/2:] *= \
hann_window[len(hann_window)/2:]
elif step == steps[-1]:
apply_hann.data[:len(hann_window)/2] *= \
hann_window[:len(hann_window)/2]
else:
apply_hann.data *= hann_window
chunk_line.data *= apply_hann.data
data.data[start:end] -= chunk_line.data.real
return data