Source code for pycbc.distributions.qnm

# Copyright (C) 2018 Miriam Cabero, Collin Capano
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import logging
import re
import numpy

import pycbc
from pycbc import conversions, boundaries

from . import uniform, bounded

logger = logging.getLogger('pycbc.distributions.qnm')


[docs]class UniformF0Tau(uniform.Uniform): """A distribution uniform in QNM frequency and damping time. Constraints may be placed to exclude frequencies and damping times corresponding to specific masses and spins. To ensure a properly normalized pdf that accounts for the constraints on final mass and spin, a renormalization factor is calculated upon initialization. This is calculated numerically: f0 and tau are drawn randomly, then the norm is scaled by the fraction of points that yield final masses and spins within the constraints. The `norm_tolerance` keyword arguments sets the error on the estimate of the norm from this numerical method. If this value is too large, such that no points are found in the allowed region, a ValueError is raised. Parameters ---------- f0 : tuple or boundaries.Bounds The range of QNM frequencies (in Hz). tau : tuple or boundaries.Bounds The range of QNM damping times (in s). final_mass : tuple or boundaries.Bounds, optional The range of final masses to allow. Default is [0,inf). final_spin : tuple or boundaries.Bounds, optional The range final spins to allow. Must be in [-0.996, 0.996], which is the default. rdfreq : str, optional Use the given string as the name for the f0 parameter. Default is 'f0'. damping_time : str, optional Use the given string as the name for the tau parameter. Default is 'tau'. norm_tolerance : float, optional The tolerance on the estimate of the normalization. Default is 1e-3. norm_seed : int, optional Seed to use for the random number generator when estimating the norm. Default is 0. After the norm is estimated, the random number generator is set back to the state it was in upon initialization. Examples -------- Create a distribution: >>> dist = UniformF0Tau(f0=(10., 2048.), tau=(1e-4,1e-2)) Check that all random samples drawn from the distribution yield final masses > 1: >>> from pycbc import conversions >>> samples = dist.rvs(size=1000) >>> (conversions.final_mass_from_f0_tau(samples['f0'], samples['tau']) > 1.).all() True Create a distribution with tighter bounds on final mass and spin: >>> dist = UniformF0Tau(f0=(10., 2048.), tau=(1e-4,1e-2), final_mass=(20., 200.), final_spin=(0,0.996)) Check that all random samples drawn from the distribution are in the final mass and spin constraints: >>> samples = dist.rvs(size=1000) >>> (conversions.final_mass_from_f0_tau(samples['f0'], samples['tau']) >= 20.).all() True >>> (conversions.final_mass_from_f0_tau(samples['f0'], samples['tau']) < 200.).all() True >>> (conversions.final_spin_from_f0_tau(samples['f0'], samples['tau']) >= 0.).all() True >>> (conversions.final_spin_from_f0_tau(samples['f0'], samples['tau']) < 0.996).all() True """ name = 'uniform_f0_tau' def __init__(self, f0=None, tau=None, final_mass=None, final_spin=None, rdfreq='f0', damping_time='tau', norm_tolerance=1e-3, norm_seed=0): if f0 is None: raise ValueError("must provide a range for f0") if tau is None: raise ValueError("must provide a range for tau") self.rdfreq = rdfreq self.damping_time = damping_time parent_args = {rdfreq: f0, damping_time: tau} super(UniformF0Tau, self).__init__(**parent_args) if final_mass is None: final_mass = (0., numpy.inf) if final_spin is None: final_spin = (-0.996, 0.996) self.final_mass_bounds = boundaries.Bounds( min_bound=final_mass[0], max_bound=final_mass[1]) self.final_spin_bounds = boundaries.Bounds( min_bound=final_spin[0], max_bound=final_spin[1]) # Re-normalize to account for cuts: we'll do this by just sampling # a large number of spaces f0 taus, and seeing how many are in the # desired range. # perseve the current random state s = numpy.random.get_state() numpy.random.seed(norm_seed) nsamples = int(1./norm_tolerance**2) draws = super(UniformF0Tau, self).rvs(size=nsamples) # reset the random state numpy.random.set_state(s) num_in = self._constraints(draws).sum() # if num_in is 0, than the requested tolerance is too large if num_in == 0: raise ValueError("the normalization is < then the norm_tolerance; " "try again with a smaller nrom_tolerance") self._lognorm += numpy.log(num_in) - numpy.log(nsamples) self._norm = numpy.exp(self._lognorm) def __contains__(self, params): isin = super(UniformF0Tau, self).__contains__(params) if isin: isin &= self._constraints(params) return isin def _constraints(self, params): f0 = params[self.rdfreq] tau = params[self.damping_time] # check if we need to specify a particular mode (l,m) != (2,2) if re.match(r'f_\d{3}', self.rdfreq): mode = self.rdfreq.strip('f_') l, m = int(mode[0]), int(mode[1]) else: l, m = 2, 2 # temporarily silence invalid warnings... these will just be ruled out # automatically with numpy.errstate(invalid="ignore"): mf = conversions.final_mass_from_f0_tau(f0, tau, l=l, m=m) sf = conversions.final_spin_from_f0_tau(f0, tau, l=l, m=m) isin = (self.final_mass_bounds.__contains__(mf)) & ( self.final_spin_bounds.__contains__(sf)) return isin
[docs] def rvs(self, size=1): """Draw random samples from this distribution. Parameters ---------- size : int, optional The number of draws to do. Default is 1. Returns ------- array A structured array of the random draws. """ size = int(size) dtype = [(p, float) for p in self.params] arr = numpy.zeros(size, dtype=dtype) remaining = size keepidx = 0 while remaining: draws = super(UniformF0Tau, self).rvs(size=remaining) mask = self._constraints(draws) addpts = mask.sum() arr[keepidx:keepidx+addpts] = draws[mask] keepidx += addpts remaining = size - keepidx return arr
[docs] @classmethod def from_config(cls, cp, section, variable_args): """Initialize this class from a config file. Bounds on ``f0``, ``tau``, ``final_mass`` and ``final_spin`` should be specified by providing ``min-{param}`` and ``max-{param}``. If the ``f0`` or ``tau`` param should be renamed, ``rdfreq`` and ``damping_time`` should be provided; these must match ``variable_args``. If ``rdfreq`` and ``damping_time`` are not provided, ``variable_args`` are expected to be ``f0`` and ``tau``. Only ``min/max-f0`` and ``min/max-tau`` need to be provided. Example: .. code-block:: ini [{section}-f0+tau] name = uniform_f0_tau min-f0 = 10 max-f0 = 2048 min-tau = 0.0001 max-tau = 0.010 min-final_mass = 10 Parameters ---------- cp : pycbc.workflow.WorkflowConfigParser WorkflowConfigParser instance to read. section : str The name of the section to read. variable_args : str The name of the variable args. These should be separated by ``pycbc.VARARGS_DELIM``. Returns ------- UniformF0Tau : This class initialized with the parameters provided in the config file. """ tag = variable_args variable_args = set(variable_args.split(pycbc.VARARGS_DELIM)) # get f0 and tau f0 = bounded.get_param_bounds_from_config(cp, section, tag, 'f0') tau = bounded.get_param_bounds_from_config(cp, section, tag, 'tau') # see if f0 and tau should be renamed if cp.has_option_tag(section, 'rdfreq', tag): rdfreq = cp.get_opt_tag(section, 'rdfreq', tag) else: rdfreq = 'f0' if cp.has_option_tag(section, 'damping_time', tag): damping_time = cp.get_opt_tag(section, 'damping_time', tag) else: damping_time = 'tau' # check that they match whats in the variable args if not variable_args == set([rdfreq, damping_time]): raise ValueError("variable args do not match rdfreq and " "damping_time names") # get the final mass and spin values, if provided final_mass = bounded.get_param_bounds_from_config( cp, section, tag, 'final_mass') final_spin = bounded.get_param_bounds_from_config( cp, section, tag, 'final_spin') extra_opts = {} if cp.has_option_tag(section, 'norm_tolerance', tag): extra_opts['norm_tolerance'] = float( cp.get_opt_tag(section, 'norm_tolerance', tag)) if cp.has_option_tag(section, 'norm_seed', tag): extra_opts['norm_seed'] = int( cp.get_opt_tag(section, 'norm_seed', tag)) return cls(f0=f0, tau=tau, final_mass=final_mass, final_spin=final_spin, rdfreq=rdfreq, damping_time=damping_time, **extra_opts)