Source code for pycbc.strain.recalibrate

""" Classes and functions for adjusting strain data.
"""
# Copyright (C) 2015 Ben Lackey, Christopher M. Biwer,
#                    Daniel Finstad, Colm Talbot, Alex Nitz
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

from abc import (ABCMeta, abstractmethod)

import numpy as np
from scipy.interpolate import UnivariateSpline
from pycbc.types import FrequencySeries


[docs]class Recalibrate(metaclass=ABCMeta): """ Base class for modifying calibration """ name = None def __init__(self, ifo_name): self.ifo_name = ifo_name self.params = dict()
[docs] @abstractmethod def apply_calibration(self, strain): """Apply calibration model This method should be overwritten by subclasses Parameters ---------- strain : FrequencySeries The strain to be recalibrated. Return ------ strain_adjusted : FrequencySeries The recalibrated strain. """ return
[docs] def map_to_adjust(self, strain, prefix='recalib_', **params): """Map an input dictionary of sampling parameters to the adjust_strain function by filtering the dictionary for the calibration parameters, then calling adjust_strain. Parameters ---------- strain : FrequencySeries The strain to be recalibrated. prefix: str Prefix for calibration parameter names params : dict Dictionary of sampling parameters which includes calibration parameters. Return ------ strain_adjusted : FrequencySeries The recalibrated strain. """ self.params.update({ key[len(prefix):]: params[key] for key in params if prefix in key and self.ifo_name in key}) strain_adjusted = self.apply_calibration(strain) return strain_adjusted
[docs] @classmethod def from_config(cls, cp, ifo, section): """Read a config file to get calibration options and transfer functions which will be used to intialize the model. Parameters ---------- cp : WorkflowConfigParser An open config file. ifo : string The detector (H1, L1) for which the calibration model will be loaded. section : string The section name in the config file from which to retrieve the calibration options. Return ------ instance An instance of the class. """ all_params = dict(cp.items(section)) params = {key[len(ifo)+1:]: all_params[key] for key in all_params if ifo.lower() in key} params = {key: params[key] for key in params} params.pop('model') params['ifo_name'] = ifo.lower() return cls(**params)
[docs]class CubicSpline(Recalibrate): """Cubic spline recalibration see https://dcc.ligo.org/LIGO-T1400682/public This assumes the spline points follow np.logspace(np.log(minimum_frequency), np.log(maximum_frequency), n_points) Parameters ---------- minimum_frequency: float minimum frequency of spline points maximum_frequency: float maximum frequency of spline points n_points: int number of spline points """ name = 'cubic_spline' def __init__(self, minimum_frequency, maximum_frequency, n_points, ifo_name): Recalibrate.__init__(self, ifo_name=ifo_name) minimum_frequency = float(minimum_frequency) maximum_frequency = float(maximum_frequency) n_points = int(n_points) if n_points < 4: raise ValueError( 'Use at least 4 spline points for calibration model') self.n_points = n_points self.spline_points = np.logspace(np.log10(minimum_frequency), np.log10(maximum_frequency), n_points)
[docs] def apply_calibration(self, strain): """Apply calibration model This applies cubic spline calibration to the strain. Parameters ---------- strain : FrequencySeries The strain to be recalibrated. Return ------ strain_adjusted : FrequencySeries The recalibrated strain. """ amplitude_parameters =\ [self.params['amplitude_{}_{}'.format(self.ifo_name, ii)] for ii in range(self.n_points)] amplitude_spline = UnivariateSpline(self.spline_points, amplitude_parameters) delta_amplitude = amplitude_spline(strain.sample_frequencies.numpy()) phase_parameters =\ [self.params['phase_{}_{}'.format(self.ifo_name, ii)] for ii in range(self.n_points)] phase_spline = UnivariateSpline(self.spline_points, phase_parameters) delta_phase = phase_spline(strain.sample_frequencies.numpy()) strain_adjusted = strain * (1.0 + delta_amplitude)\ * (2.0 + 1j * delta_phase) / (2.0 - 1j * delta_phase) return strain_adjusted
[docs]class PhysicalModel(object): """ Class for adjusting time-varying calibration parameters of given strain data. Parameters ---------- strain : FrequencySeries The strain to be adjusted. freq : array The frequencies corresponding to the values of c0, d0, a0 in Hertz. fc0 : float Coupled-cavity (CC) pole at time t0, when c0=c(t0) and a0=a(t0) are measured. c0 : array Initial sensing function at t0 for the frequencies. d0 : array Digital filter for the frequencies. a_tst0 : array Initial actuation function for the test mass at t0 for the frequencies. a_pu0 : array Initial actuation function for the penultimate mass at t0 for the frequencies. fs0 : float Initial spring frequency at t0 for the signal recycling cavity. qinv0 : float Initial inverse quality factor at t0 for the signal recycling cavity. """ name = 'physical_model' def __init__(self, freq=None, fc0=None, c0=None, d0=None, a_tst0=None, a_pu0=None, fs0=None, qinv0=None): self.freq = np.real(freq) self.c0 = c0 self.d0 = d0 self.a_tst0 = a_tst0 self.a_pu0 = a_pu0 self.fc0 = float(fc0) self.fs0 = float(fs0) self.qinv0 = float(qinv0) # initial detuning at time t0 init_detuning = self.freq**2 / (self.freq**2 - 1.0j * self.freq * \ self.fs0 * self.qinv0 + self.fs0**2) # initial open loop gain self.g0 = self.c0 * self.d0 * (self.a_tst0 + self.a_pu0) # initial response function self.r0 = (1.0 + self.g0) / self.c0 # residual of c0 after factoring out the coupled cavity pole fc0 self.c_res = self.c0 * (1 + 1.0j * self.freq / self.fc0)/init_detuning
[docs] def update_c(self, fs=None, qinv=None, fc=None, kappa_c=1.0): """ Calculate the sensing function c(f,t) given the new parameters kappa_c(t), kappa_a(t), f_c(t), fs, and qinv. Parameters ---------- fc : float Coupled-cavity (CC) pole at time t. kappa_c : float Scalar correction factor for sensing function at time t. fs : float Spring frequency for signal recycling cavity. qinv : float Inverse quality factor for signal recycling cavity. Returns ------- c : numpy.array The new sensing function c(f,t). """ detuning_term = self.freq**2 / (self.freq**2 - 1.0j *self.freq*fs * \ qinv + fs**2) return self.c_res * kappa_c / (1 + 1.0j * self.freq/fc)*detuning_term
[docs] def update_g(self, fs=None, qinv=None, fc=None, kappa_tst_re=1.0, kappa_tst_im=0.0, kappa_pu_re=1.0, kappa_pu_im=0.0, kappa_c=1.0): """ Calculate the open loop gain g(f,t) given the new parameters kappa_c(t), kappa_a(t), f_c(t), fs, and qinv. Parameters ---------- fc : float Coupled-cavity (CC) pole at time t. kappa_c : float Scalar correction factor for sensing function c at time t. kappa_tst_re : float Real part of scalar correction factor for actuation function a_tst0 at time t. kappa_pu_re : float Real part of scalar correction factor for actuation function a_pu0 at time t. kappa_tst_im : float Imaginary part of scalar correction factor for actuation function a_tst0 at time t. kappa_pu_im : float Imaginary part of scalar correction factor for actuation function a_pu0 at time t. fs : float Spring frequency for signal recycling cavity. qinv : float Inverse quality factor for signal recycling cavity. Returns ------- g : numpy.array The new open loop gain g(f,t). """ c = self.update_c(fs=fs, qinv=qinv, fc=fc, kappa_c=kappa_c) a_tst = self.a_tst0 * (kappa_tst_re + 1.0j * kappa_tst_im) a_pu = self.a_pu0 * (kappa_pu_re + 1.0j * kappa_pu_im) return c * self.d0 * (a_tst + a_pu)
[docs] def update_r(self, fs=None, qinv=None, fc=None, kappa_c=1.0, kappa_tst_re=1.0, kappa_tst_im=0.0, kappa_pu_re=1.0, kappa_pu_im=0.0): """ Calculate the response function R(f,t) given the new parameters kappa_c(t), kappa_a(t), f_c(t), fs, and qinv. Parameters ---------- fc : float Coupled-cavity (CC) pole at time t. kappa_c : float Scalar correction factor for sensing function c at time t. kappa_tst_re : float Real part of scalar correction factor for actuation function a_tst0 at time t. kappa_pu_re : float Real part of scalar correction factor for actuation function a_pu0 at time t. kappa_tst_im : float Imaginary part of scalar correction factor for actuation function a_tst0 at time t. kappa_pu_im : float Imaginary part of scalar correction factor for actuation function a_pu0 at time t. fs : float Spring frequency for signal recycling cavity. qinv : float Inverse quality factor for signal recycling cavity. Returns ------- r : numpy.array The new response function r(f,t). """ c = self.update_c(fs=fs, qinv=qinv, fc=fc, kappa_c=kappa_c) g = self.update_g(fs=fs, qinv=qinv, fc=fc, kappa_c=kappa_c, kappa_tst_re=kappa_tst_re, kappa_tst_im=kappa_tst_im, kappa_pu_re=kappa_pu_re, kappa_pu_im=kappa_pu_im) return (1.0 + g) / c
[docs] def adjust_strain(self, strain, delta_fs=None, delta_qinv=None, delta_fc=None, kappa_c=1.0, kappa_tst_re=1.0, kappa_tst_im=0.0, kappa_pu_re=1.0, kappa_pu_im=0.0): """Adjust the FrequencySeries strain by changing the time-dependent calibration parameters kappa_c(t), kappa_a(t), f_c(t), fs, and qinv. Parameters ---------- strain : FrequencySeries The strain data to be adjusted. delta_fc : float Change in coupled-cavity (CC) pole at time t. kappa_c : float Scalar correction factor for sensing function c0 at time t. kappa_tst_re : float Real part of scalar correction factor for actuation function A_{tst0} at time t. kappa_tst_im : float Imaginary part of scalar correction factor for actuation function A_tst0 at time t. kappa_pu_re : float Real part of scalar correction factor for actuation function A_{pu0} at time t. kappa_pu_im : float Imaginary part of scalar correction factor for actuation function A_{pu0} at time t. fs : float Spring frequency for signal recycling cavity. qinv : float Inverse quality factor for signal recycling cavity. Returns ------- strain_adjusted : FrequencySeries The adjusted strain. """ fc = self.fc0 + delta_fc if delta_fc else self.fc0 fs = self.fs0 + delta_fs if delta_fs else self.fs0 qinv = self.qinv0 + delta_qinv if delta_qinv else self.qinv0 # calculate adjusted response function r_adjusted = self.update_r(fs=fs, qinv=qinv, fc=fc, kappa_c=kappa_c, kappa_tst_re=kappa_tst_re, kappa_tst_im=kappa_tst_im, kappa_pu_re=kappa_pu_re, kappa_pu_im=kappa_pu_im) # calculate error function k = r_adjusted / self.r0 # decompose into amplitude and unwrapped phase k_amp = np.abs(k) k_phase = np.unwrap(np.angle(k)) # convert to FrequencySeries by interpolating then resampling order = 1 k_amp_off = UnivariateSpline(self.freq, k_amp, k=order, s=0) k_phase_off = UnivariateSpline(self.freq, k_phase, k=order, s=0) freq_even = strain.sample_frequencies.numpy() k_even_sample = k_amp_off(freq_even) * \ np.exp(1.0j * k_phase_off(freq_even)) strain_adjusted = FrequencySeries(strain.numpy() * \ k_even_sample, delta_f=strain.delta_f) return strain_adjusted
[docs] @classmethod def tf_from_file(cls, path, delimiter=" "): """Convert the contents of a file with the columns [freq, real(h), imag(h)] to a numpy.array with columns [freq, real(h)+j*imag(h)]. Parameters ---------- path : string delimiter : {" ", string} Return ------ numpy.array """ data = np.loadtxt(path, delimiter=delimiter) freq = data[:, 0] h = data[:, 1] + 1.0j * data[:, 2] return np.array([freq, h]).transpose()
[docs] @classmethod def from_config(cls, cp, ifo, section): """Read a config file to get calibration options and transfer functions which will be used to intialize the model. Parameters ---------- cp : WorkflowConfigParser An open config file. ifo : string The detector (H1, L1) for which the calibration model will be loaded. section : string The section name in the config file from which to retrieve the calibration options. Return ------ instance An instance of the Recalibrate class. """ # read transfer functions tfs = [] tf_names = ["a-tst", "a-pu", "c", "d"] for tag in ['-'.join([ifo, "transfer-function", name]) for name in tf_names]: tf_path = cp.get_opt_tag(section, tag, None) tfs.append(cls.tf_from_file(tf_path)) a_tst0 = tfs[0][:, 1] a_pu0 = tfs[1][:, 1] c0 = tfs[2][:, 1] d0 = tfs[3][:, 1] freq = tfs[0][:, 0] # if upper stage actuation is included, read that in and add it # to a_pu0 uim_tag = '-'.join([ifo, 'transfer-function-a-uim']) if cp.has_option(section, uim_tag): tf_path = cp.get_opt_tag(section, uim_tag, None) a_pu0 += cls.tf_from_file(tf_path)[:, 1] # read fc0, fs0, and qinv0 fc0 = cp.get_opt_tag(section, '-'.join([ifo, "fc0"]), None) fs0 = cp.get_opt_tag(section, '-'.join([ifo, "fs0"]), None) qinv0 = cp.get_opt_tag(section, '-'.join([ifo, "qinv0"]), None) return cls(freq=freq, fc0=fc0, c0=c0, d0=d0, a_tst0=a_tst0, a_pu0=a_pu0, fs0=fs0, qinv0=qinv0)
[docs] def map_to_adjust(self, strain, **params): """Map an input dictionary of sampling parameters to the adjust_strain function by filtering the dictionary for the calibration parameters, then calling adjust_strain. Parameters ---------- strain : FrequencySeries The strain to be recalibrated. params : dict Dictionary of sampling parameters which includes calibration parameters. Return ------ strain_adjusted : FrequencySeries The recalibrated strain. """ # calibration param names arg_names = ['delta_fs', 'delta_fc', 'delta_qinv', 'kappa_c', 'kappa_tst_re', 'kappa_tst_im', 'kappa_pu_re', 'kappa_pu_im'] # calibration param labels as they exist in config files arg_labels = [''.join(['calib_', name]) for name in arg_names] # default values for calibration params default_values = [0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0] # make list of calibration param values calib_args = [] for arg, val in zip(arg_labels, default_values): if arg in params: calib_args.append(params[arg]) else: calib_args.append(val) # adjust the strain using calibration param values strain_adjusted = self.adjust_strain(strain, delta_fs=calib_args[0], delta_fc=calib_args[1], delta_qinv=calib_args[2], kappa_c=calib_args[3], kappa_tst_re=calib_args[4], kappa_tst_im=calib_args[5], kappa_pu_re=calib_args[6], kappa_pu_im=calib_args[7]) return strain_adjusted