"""
Tools for maximum likelihood fits to single trigger statistic values
For some set of values above a threshold, e.g. trigger SNRs, the functions
in this module perform maximum likelihood fits with 1-sigma uncertainties
to various simple functional forms of PDF, all normalized to 1.
You can also obtain the fitted function and its (inverse) CDF and perform
a Kolmogorov-Smirnov test.
Usage:
# call the fit function directly if the threshold is known
alpha, sigma_alpha = fit_exponential(snrs, 5.5)
# apply a threshold explicitly
alpha, sigma_alpha = fit_above_thresh('exponential', snrs, thresh=6.25)
# let the code work out the threshold from the smallest value via the default thresh=None
alpha, sigma_alpha = fit_above_thresh('exponential', snrs)
# or only fit the largest N values, i.e. tail fitting
thresh = tail_threshold(snrs, N=500)
alpha, sigma_alpha = fit_above_thresh('exponential', snrs, thresh)
# obtain the fitted function directly
xvals = numpy.xrange(5.5, 10.5, 20)
exponential_fit = expfit(xvals, alpha, thresh)
# or access function by name
exponential_fit_1 = fit_fn('exponential', xvals, alpha, thresh)
# Use weighting factors to e.g. take decimation into account
alpha, sigma_alpha = fit_above_thresh('exponential', snrs, weights=weights)
# get the KS test statistic and p-value - see scipy.stats.kstest
ks_stat, ks_pval = KS_test('exponential', snrs, alpha, thresh)
"""
# Copyright T. Dent 2015 (thomas.dent@aei.mpg.de)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
import logging
import numpy
from scipy.stats import kstest
logger = logging.getLogger('pycbc.events.trigger_fits')
[docs]
def exponential_fitalpha(vals, thresh, w):
"""
Maximum likelihood estimator for the fit factor for
an exponential decrease model
"""
return 1. / (numpy.average(vals, weights=w) - thresh)
[docs]
def rayleigh_fitalpha(vals, thresh, w):
"""
Maximum likelihood estimator for the fit factor for
a Rayleigh distribution of events
"""
return 2. / (numpy.average(vals ** 2., weights=w) - thresh ** 2.)
[docs]
def power_fitalpha(vals, thresh, w):
"""
Maximum likelihood estimator for the fit factor for
a power law model
"""
return numpy.average(numpy.log(vals/thresh), weights=w) ** -1. + 1.
fitalpha_dict = {
'exponential' : exponential_fitalpha,
'rayleigh' : rayleigh_fitalpha,
'power' : power_fitalpha
}
# measurement standard deviation = (-d^2 log L/d alpha^2)^(-1/2)
fitstd_dict = {
'exponential' : lambda weights, alpha : alpha / sum(weights) ** 0.5,
'rayleigh' : lambda weights, alpha : alpha / sum(weights) ** 0.5,
'power' : lambda weights, alpha : (alpha - 1.) / sum(weights) ** 0.5
}
[docs]
def fit_above_thresh(distr, vals, thresh=None, weights=None):
"""
Maximum likelihood fit for the coefficient alpha
Fitting a distribution of discrete values above a given threshold.
Exponential p(x) = alpha exp(-alpha (x-x_t))
Rayleigh p(x) = alpha x exp(-alpha (x**2-x_t**2)/2)
Power p(x) = ((alpha-1)/x_t) (x/x_t)**-alpha
Values below threshold will be discarded.
If no threshold is specified the minimum sample value will be used.
Parameters
----------
distr : {'exponential', 'rayleigh', 'power'}
Name of distribution
vals : sequence of floats
Values to fit
thresh : float
Threshold to apply before fitting; if None, use min(vals)
weights: sequence of floats
Weighting factors to use for the values when fitting.
Default=None - all the same
Returns
-------
alpha : float
Fitted value
sigma_alpha : float
Standard error in fitted value
"""
vals = numpy.array(vals)
if thresh is None:
thresh = min(vals)
above_thresh = numpy.ones_like(vals, dtype=bool)
else:
above_thresh = vals >= thresh
if numpy.count_nonzero(above_thresh) == 0:
# Nothing is above threshold - warn and return -1
logger.warning("No values are above the threshold, %.2f, "
"maximum is %.2f.", thresh, vals.max())
return -1., -1.
vals = vals[above_thresh]
# Set up the weights
if weights is not None:
weights = numpy.array(weights)
w = weights[above_thresh]
else:
w = numpy.ones_like(vals)
alpha = fitalpha_dict[distr](vals, thresh, w)
return alpha, fitstd_dict[distr](w, alpha)
# Variables:
# x: the trigger stat value(s) at which to evaluate the function
# a: slope parameter of the fit
# t: lower threshold stat value
fitfn_dict = {
'exponential' : lambda x, a, t : a * numpy.exp(-a * (x - t)),
'rayleigh' : lambda x, a, t : (a * x * \
numpy.exp(-a * (x ** 2 - t ** 2) / 2.)),
'power' : lambda x, a, t : (a - 1.) * x ** (-a) * t ** (a - 1.)
}
[docs]
def fit_fn(distr, xvals, alpha, thresh):
"""
The fitted function normalized to 1 above threshold
To normalize to a given total count multiply by the count.
Parameters
----------
xvals : sequence of floats
Values where the function is to be evaluated
alpha : float
The fitted parameter
thresh : float
Threshold value applied to fitted values
Returns
-------
fit : array of floats
Fitted function at the requested xvals
"""
xvals = numpy.array(xvals)
fit = fitfn_dict[distr](xvals, alpha, thresh)
# set fitted values below threshold to 0
numpy.putmask(fit, xvals < thresh, 0.)
return fit
cum_fndict = {
'exponential' : lambda x, alpha, t : numpy.exp(-alpha * (x - t)),
'rayleigh' : lambda x, alpha, t : numpy.exp(-alpha * (x ** 2. - t ** 2.) / 2.),
'power' : lambda x, alpha, t : x ** (1. - alpha) * t ** (alpha - 1.)
}
[docs]
def cum_fit(distr, xvals, alpha, thresh):
"""
Integral of the fitted function above a given value (reverse CDF)
The fitted function is normalized to 1 above threshold
Parameters
----------
xvals : sequence of floats
Values where the function is to be evaluated
alpha : float
The fitted parameter
thresh : float
Threshold value applied to fitted values
Returns
-------
cum_fit : array of floats
Reverse CDF of fitted function at the requested xvals
"""
xvals = numpy.array(xvals)
cum_fit = cum_fndict[distr](xvals, alpha, thresh)
# set fitted values below threshold to 0
numpy.putmask(cum_fit, xvals < thresh, 0.)
return cum_fit
[docs]
def tail_threshold(vals, N=1000):
"""Determine a threshold above which there are N louder values"""
vals = numpy.array(vals)
if len(vals) < N:
raise RuntimeError('Not enough input values to determine threshold')
vals.sort()
return min(vals[-N:])
[docs]
def KS_test(distr, vals, alpha, thresh=None):
"""
Perform Kolmogorov-Smirnov test for fitted distribution
Compare the given set of discrete values above a given threshold to the
fitted distribution function.
If no threshold is specified, the minimum sample value will be used.
Returns the KS test statistic and its p-value: lower p means less
probable under the hypothesis of a perfect fit
Parameters
----------
distr : {'exponential', 'rayleigh', 'power'}
Name of distribution
vals : sequence of floats
Values to compare to fit
alpha : float
Fitted distribution parameter
thresh : float
Threshold to apply before fitting; if None, use min(vals)
Returns
-------
D : float
KS test statistic
p-value : float
p-value, assumed to be two-tailed
"""
vals = numpy.array(vals)
if thresh is None:
thresh = min(vals)
else:
vals = vals[vals >= thresh]
def cdf_fn(x):
return 1 - cum_fndict[distr](x, alpha, thresh)
return kstest(vals, cdf_fn)
[docs]
def which_bin(par, minpar, maxpar, nbins, log=False):
"""
Helper function
Returns bin index where a parameter value belongs (from 0 through nbins-1)
when dividing the range between minpar and maxpar equally into bins.
Parameters
----------
par : float
Parameter value being binned
minpar : float
Minimum parameter value
maxpar : float
Maximum parameter value
nbins : int
Number of bins to use
log : boolean
If True, use log spaced bins
Returns
-------
binind : int
Bin index
"""
assert (par >= minpar and par <= maxpar)
if log:
par, minpar, maxpar = numpy.log(par), numpy.log(minpar), numpy.log(maxpar)
# par lies some fraction of the way between min and max
if minpar != maxpar:
frac = float(par - minpar) / float(maxpar - minpar)
else:
# if they are equal there is only one size 0 bin
# must be in that bin
frac = 0
# binind then lies between 0 and nbins - 1
binind = int(frac * nbins)
# corner case
if par == maxpar:
binind = nbins - 1
return binind