Source code for pycbc.distributions.joint

# Copyright (C)  2017 Collin Capano, Christopher M. Biwer, Alex Nitz
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
""" This module provides classes to describe joint distributions
"""
import logging
import numpy

from pycbc.io.record import FieldArray

logger = logging.getLogger('pycbc.distributions.joint')


[docs] class JointDistribution(object): """ Callable class that calculates the joint distribution built from a set of distributions. Parameters ---------- variable_args : list A list of strings that contain the names of the variable parameters and the order they are expected when the class is called. \*distributions : The rest of the arguments must be instances of distributions describing the individual distributions on the variable parameters. A single distribution may contain multiple parameters. The set of all params across the distributions (retrieved from the distributions' params attribute) must be the same as the set of variable_args provided. \*\*kwargs : Valid keyword arguments include: `constraints` : a list of functions that accept a dict of parameters with the parameter name as the key. If the constraint is satisfied the function should return True, if the constraint is violated, then the function should return False. `n_test_samples` : number of random draws used to fix pdf normalization factor after applying constraints. Attributes ---------- variable_args : tuple The parameters expected when the evaluator is called. distributions : list The distributions for the parameters. constraints : list A list of functions to test if parameter values obey multi-dimensional constraints. Examples -------- An example of creating a joint distribution with constraint that total mass must be below 30. >>> from pycbc.distributions import Uniform, JointDistribution >>> def mtotal_lt_30(params): ... return params["mass1"] + params["mass2"] < 30 >>> mass_lim = (2, 50) >>> uniform_prior = Uniform(mass1=mass_lim, mass2=mass_lim) >>> prior_eval = JointDistribution(["mass1", "mass2"], uniform_prior, ... constraints=[mtotal_lt_30]) >>> print(prior_eval(mass1=20, mass2=1)) """ name = 'joint' def __init__(self, variable_args, *distributions, **kwargs): # store the names of the parameters defined in the distributions self.variable_args = tuple(variable_args) # store the distributions self.distributions = distributions # store the constraints on the parameters defined inside the # distributions list self._constraints = kwargs["constraints"] \ if "constraints" in kwargs.keys() else [] # store kwargs self.kwargs = kwargs # check that all of the supplied parameters are described by the given # distributions distparams = set() for dist in distributions: distparams.update(set(dist.params)) varset = set(self.variable_args) missing_params = distparams - varset if missing_params: raise ValueError("provided variable_args do not include " "parameters %s" %(','.join(missing_params)) + " which are " "required by the provided distributions") extra_params = varset - distparams if extra_params: raise ValueError("variable_args %s " %(','.join(extra_params)) + "are not in any of the provided distributions") # if there are constraints then find the renormalization factor # since a constraint will cut out part of the space # do this by random sampling the full space and find the percent # of samples rejected n_test_samples = kwargs["n_test_samples"] \ if "n_test_samples" in kwargs else int(1e6) if self._constraints: logger.info("Renormalizing distribution for constraints") # draw samples samples = {} for dist in self.distributions: draw = dist.rvs(n_test_samples) for param in dist.params: samples[param] = draw[param] samples = FieldArray.from_kwargs(**samples) # evaluate constraints result = self.within_constraints(samples) # set new scaling factor for prior to be # the fraction of acceptances in random sampling of entire space self._pdf_scale = result.sum() / float(n_test_samples) if self._pdf_scale == 0.0: raise ValueError("None of the random draws for pdf " "renormalization satisfied the constraints. " " You can try increasing the 'n_test_samples' keyword.") else: self._pdf_scale = 1.0 # since Distributions will return logpdf we keep the scale factor # in log scale as well for self.__call__ self._logpdf_scale = numpy.log(self._pdf_scale)
[docs] def apply_boundary_conditions(self, **params): """Applies each distributions' boundary conditions to the given list of parameters, returning a new list with the conditions applied. Parameters ---------- **params : Keyword arguments should give the parameters to apply the conditions to. Returns ------- dict A dictionary of the parameters after each distribution's `apply_boundary_conditions` function has been applied. """ for dist in self.distributions: params.update(dist.apply_boundary_conditions(**params)) return params
@staticmethod def _return_atomic(params): """Determines if an array or atomic value should be returned given a set of input params. Parameters ---------- params : dict, numpy.record, array, or FieldArray The input to evaluate. Returns ------- bool : Whether or not functions run on the parameters should be returned as atomic types or not. """ if isinstance(params, dict): return not any(isinstance(val, numpy.ndarray) for val in params.values()) elif isinstance(params, numpy.record): return True elif isinstance(params, numpy.ndarray): return False params = params.view(type=FieldArray) elif isinstance(params, FieldArray): return False else: raise ValueError("params must be either dict, FieldArray, " "record, or structured array") @staticmethod def _ensure_fieldarray(params): """Ensures the given params are a ``FieldArray``. Parameters ---------- params : dict, FieldArray, numpy.record, or numpy.ndarray If the given object is a dict, it will be converted to a FieldArray. Returns ------- FieldArray The given values as a FieldArray. """ if isinstance(params, dict): return FieldArray.from_kwargs(**params) elif isinstance(params, numpy.record): return FieldArray.from_records(tuple(params), names=params.dtype.names) elif isinstance(params, numpy.ndarray): return params.view(type=FieldArray) elif isinstance(params, FieldArray): return params else: raise ValueError("params must be either dict, FieldArray, " "record, or structured array")
[docs] def within_constraints(self, params): """Evaluates whether the given parameters satisfy the constraints. Parameters ---------- params : dict, FieldArray, numpy.record, or numpy.ndarray The parameter values to evaluate. Returns ------- (array of) bool : If params was an array, or if params a dictionary and one or more of the parameters are arrays, will return an array of booleans. Otherwise, a boolean. """ params = self._ensure_fieldarray(params) return_atomic = self._return_atomic(params) # convert params to a field array if it isn't one result = numpy.ones(params.shape, dtype=bool) for constraint in self._constraints: result &= constraint(params) if return_atomic: result = result.item() return result
[docs] def contains(self, params): """Evaluates whether the given parameters satisfy the boundary conditions, boundaries, and constraints. This method is different from `within_constraints`, that method only check the constraints. Parameters ---------- params : dict, FieldArray, numpy.record, or numpy.ndarray The parameter values to evaluate. Returns ------- (array of) bool : If params was an array, or if params a dictionary and one or more of the parameters are arrays, will return an array of booleans. Otherwise, a boolean. """ params = self.apply_boundary_conditions(**params) result = True for dist in self.distributions: param_names = dist.params vlen = len(params[param_names[0]]) contain_array = numpy.ones(vlen, dtype=bool) # note: enable `__contains__` in `pycbc.distributions.bounded` # to handle array-like input, it doesn't work now. for i in range(vlen): data = {pname: params[pname][i] for pname in param_names} contain_array[i] = data in dist result &= numpy.array(contain_array) result &= self.within_constraints(params) return result
def __call__(self, **params): """Evaluate joint distribution for parameters. """ return_atomic = self._return_atomic(params) # check if statisfies constraints if len(self._constraints) != 0: parray = self._ensure_fieldarray(params) isin = self.within_constraints(parray) if not isin.any(): if return_atomic: out = -numpy.inf else: out = numpy.full(parray.shape, -numpy.inf) return out # evaluate # note: this step may fail if arrays of values were provided, as # not all distributions are vectorized currently logps = numpy.array([d(**params) for d in self.distributions]) logp = logps.sum(axis=0) if len(self._constraints) != 0: logp += numpy.log(isin.astype(float)) if return_atomic: logp = logp.item() return logp - self._logpdf_scale
[docs] def rvs(self, size=1): """ Rejection samples the parameter space. """ # create output FieldArray dtype = [(arg, float) for arg in self.variable_args] out = FieldArray(size, dtype=dtype) # loop until enough samples accepted remaining = size ndraw = size while remaining: # scratch space for evaluating constraints scratch = FieldArray(ndraw, dtype=dtype) for dist in self.distributions: # drawing samples from the distributions is generally faster # then evaluating constrants, so we'll always draw the full # size, even if that gives us more points than we need draw = dist.rvs(size=ndraw) for param in dist.params: scratch[param] = draw[param] # apply any constraints keep = self.within_constraints(scratch) nkeep = keep.sum() kmin = size - remaining kmax = min(nkeep, remaining) out[kmin:kmin+kmax] = scratch[keep][:kmax] remaining = max(0, remaining - nkeep) # to try to speed up next go around, we'll increase the draw # size by the fraction of values that were kept, but cap at 1e6 ndraw = int(min(1e6, ndraw * numpy.ceil(ndraw / (nkeep + 1.)))) return out
@property def well_reflected(self): """ Get list of which parameters are well reflected """ reflect = [] bounds = self.bounds for param in bounds: if bounds[param].reflected == 'well': reflect.append(param) return reflect @property def cyclic(self): """ Get list of which parameters are cyclic """ cyclic = [] bounds = self.bounds for param in bounds: if bounds[param].cyclic: cyclic.append(param) return cyclic @property def bounds(self): """ Get the dict of boundaries """ bnds = {} for dist in self.distributions: if hasattr(dist, 'bounds'): bnds.update(dist.bounds) return bnds
[docs] def cdfinv(self, **original): """ Apply the inverse cdf to the array of values [0, 1]. Every variable parameter must be given as a keyword argument. """ updated = {} for dist in self.distributions: updated.update(dist.cdfinv(**original)) return updated