import numpy
import bisect
import logging
from . import bin_utils
logger = logging.getLogger('pycbc.rate')
[docs]
def integral_element(mu, pdf):
'''
Returns an array of elements of the integrand dP = p(mu) dmu
for a density p(mu) defined at sample values mu ; samples need
not be equally spaced. Uses a simple trapezium rule.
Number of dP elements is 1 - (number of mu samples).
'''
dmu = mu[1:] - mu[:-1]
bin_mean = (pdf[1:] + pdf[:-1]) / 2.
return dmu * bin_mean
[docs]
def normalize_pdf(mu, pofmu):
"""
Takes a function pofmu defined at rate sample values mu and
normalizes it to be a suitable pdf. Both mu and pofmu must be
arrays or lists of the same length.
"""
if min(pofmu) < 0:
raise ValueError("Probabilities cannot be negative, don't ask me to "
"normalize a function with negative values!")
if min(mu) < 0:
raise ValueError("Rates cannot be negative, don't ask me to "
"normalize a function over a negative domain!")
dp = integral_element(mu, pofmu)
return mu, pofmu/sum(dp)
[docs]
def compute_upper_limit(mu_in, post, alpha=0.9):
"""
Returns the upper limit mu_high of confidence level alpha for a
posterior distribution post on the given parameter mu.
The posterior need not be normalized.
"""
if 0 < alpha < 1:
dp = integral_element(mu_in, post)
high_idx = bisect.bisect_left(dp.cumsum() / dp.sum(), alpha)
# if alpha is in (0,1] and post is non-negative, bisect_left
# will always return an index in the range of mu since
# post.cumsum()/post.sum() will always begin at 0 and end at 1
mu_high = mu_in[high_idx]
elif alpha == 1:
mu_high = numpy.max(mu_in[post > 0])
else:
raise ValueError("Confidence level must be in (0,1].")
return mu_high
[docs]
def compute_lower_limit(mu_in, post, alpha=0.9):
"""
Returns the lower limit mu_low of confidence level alpha for a
posterior distribution post on the given parameter mu.
The posterior need not be normalized.
"""
if 0 < alpha < 1:
dp = integral_element(mu_in, post)
low_idx = bisect.bisect_right(dp.cumsum() / dp.sum(), 1 - alpha)
# if alpha is in [0,1) and post is non-negative, bisect_right
# will always return an index in the range of mu since
# post.cumsum()/post.sum() will always begin at 0 and end at 1
mu_low = mu_in[low_idx]
elif alpha == 1:
mu_low = numpy.min(mu_in[post > 0])
else:
raise ValueError("Confidence level must be in (0,1].")
return mu_low
[docs]
def confidence_interval_min_width(mu, post, alpha=0.9):
'''
Returns the minimal-width confidence interval [mu_low, mu_high] of
confidence level alpha for a posterior distribution post on the parameter mu.
'''
if not 0 < alpha < 1:
raise ValueError("Confidence level must be in (0,1).")
# choose a step size for the sliding confidence window
alpha_step = 0.01
# initialize the lower and upper limits
mu_low = numpy.min(mu)
mu_high = numpy.max(mu)
# find the smallest window (by delta-mu) stepping by dalpha
for ai in numpy.arange(0, 1 - alpha, alpha_step):
ml = compute_lower_limit(mu, post, 1 - ai)
mh = compute_upper_limit(mu, post, alpha + ai)
if mh - ml < mu_high - mu_low:
mu_low = ml
mu_high = mh
return mu_low, mu_high
[docs]
def hpd_coverage(mu, pdf, thresh):
'''
Integrates a pdf over mu taking only bins where
the mean over the bin is above a given threshold
This gives the coverage of the HPD interval for
the given threshold.
'''
dp = integral_element(mu, pdf)
bin_mean = (pdf[1:] + pdf[:-1]) / 2.
return dp[bin_mean > thresh].sum()
[docs]
def hpd_threshold(mu_in, post, alpha, tol):
'''
For a PDF post over samples mu_in, find a density
threshold such that the region having higher density
has coverage of at least alpha, and less than alpha
plus a given tolerance.
'''
norm_post = normalize_pdf(mu_in, post)
# initialize bisection search
p_minus = 0.0
p_plus = max(post)
while abs(hpd_coverage(mu_in, norm_post, p_minus) -
hpd_coverage(mu_in, norm_post, p_plus)) >= tol:
p_test = (p_minus + p_plus) / 2.
if hpd_coverage(mu_in, post, p_test) >= alpha:
# test value was too low or just right
p_minus = p_test
else:
# test value was too high
p_plus = p_test
# p_minus never goes above the required threshold and p_plus never goes below
# thus on exiting p_minus is at or below the required threshold and the
# difference in coverage is within tolerance
return p_minus
[docs]
def hpd_credible_interval(mu_in, post, alpha=0.9, tolerance=1e-3):
'''
Returns the minimum and maximum rate values of the HPD
(Highest Posterior Density) credible interval for a posterior
post defined at the sample values mu_in. Samples need not be
uniformly spaced and posterior need not be normalized.
Will not return a correct credible interval if the posterior
is multimodal and the correct interval is not contiguous;
in this case will over-cover by including the whole range from
minimum to maximum mu.
'''
if alpha == 1:
nonzero_samples = mu_in[post > 0]
mu_low = numpy.min(nonzero_samples)
mu_high = numpy.max(nonzero_samples)
elif 0 < alpha < 1:
# determine the highest PDF for which the region with
# higher density has sufficient coverage
pthresh = hpd_threshold(mu_in, post, alpha, tol=tolerance)
samples_over_threshold = mu_in[post > pthresh]
mu_low = numpy.min(samples_over_threshold)
mu_high = numpy.max(samples_over_threshold)
return mu_low, mu_high
# Following functions are for the old pylal volume vs mass calculations
# These were replaced by 'imr_utils' functions now contained in sensitivity.py
# and bin_utils.py
[docs]
def integrate_efficiency(dbins, eff, err=0, logbins=False):
if logbins:
logd = numpy.log(dbins)
dlogd = logd[1:] - logd[:-1]
# use log midpoint of bins
dreps = numpy.exp((numpy.log(dbins[1:]) + numpy.log(dbins[:-1])) / 2.)
vol = numpy.sum(4.*numpy.pi * dreps**3. * eff * dlogd)
# propagate errors in eff to errors in v
verr = numpy.sqrt(
numpy.sum((4.*numpy.pi * dreps**3. * err * dlogd)**2.)
)
else:
dd = dbins[1:] - dbins[:-1]
dreps = (dbins[1:] + dbins[:-1]) / 2.
vol = numpy.sum(4. * numpy.pi * dreps**2. * eff * dd)
# propagate errors
verr = numpy.sqrt(numpy.sum((4.*numpy.pi * dreps**2. * err * dd)**2.))
return vol, verr
[docs]
def compute_efficiency(f_dist, m_dist, dbins):
'''
Compute the efficiency as a function of distance for the given sets of found
and missed injection distances.
Note that injections that do not fit into any dbin get lost :(
'''
efficiency = numpy.zeros(len(dbins) - 1)
error = numpy.zeros(len(dbins) - 1)
for j, dlow in enumerate(dbins[:-1]):
dhigh = dbins[j + 1]
found = numpy.sum((dlow <= f_dist) * (f_dist < dhigh))
missed = numpy.sum((dlow <= m_dist) * (m_dist < dhigh))
if found+missed == 0:
# avoid divide by 0 in empty bins
missed = 1.
efficiency[j] = float(found) / (found + missed)
error[j] = numpy.sqrt(efficiency[j] * (1 - efficiency[j]) /
(found + missed))
return efficiency, error
[docs]
def mean_efficiency_volume(found, missed, dbins):
if len(found) == 0:
# no efficiency here
return numpy.zeros(len(dbins) - 1), numpy.zeros(len(dbins) - 1), 0, 0
# only need distances
f_dist = numpy.array([l.distance for l in found])
m_dist = numpy.array([l.distance for l in missed])
# compute the efficiency and its variance
eff, err = compute_efficiency(f_dist, m_dist, dbins)
vol, verr = integrate_efficiency(dbins, eff, err)
return eff, err, vol, verr
[docs]
def filter_injections_by_mass(injs, mbins, bin_num, bin_type, bin_num2=None):
'''
For a given set of injections (sim_inspiral rows), return the subset
of injections that fall within the given mass range.
'''
if bin_type == "Mass1_Mass2":
m1bins = numpy.concatenate((mbins.lower()[0],
numpy.array([mbins.upper()[0][-1]])))
m1lo = m1bins[bin_num]
m1hi = m1bins[bin_num + 1]
m2bins = numpy.concatenate((mbins.lower()[1],
numpy.array([mbins.upper()[1][-1]])))
m2lo = m2bins[bin_num2]
m2hi = m2bins[bin_num2 + 1]
newinjs = [l for l in injs if
((m1lo <= l.mass1 < m1hi and m2lo <= l.mass2 < m2hi) or
(m1lo <= l.mass2 < m1hi and m2lo <= l.mass1 < m2hi))]
return newinjs
mbins = numpy.concatenate((mbins.lower()[0],
numpy.array([mbins.upper()[0][-1]])))
mlow = mbins[bin_num]
mhigh = mbins[bin_num + 1]
if bin_type == "Chirp_Mass":
newinjs = [l for l in injs if (mlow <= l.mchirp < mhigh)]
elif bin_type == "Total_Mass":
newinjs = [l for l in injs if (mlow <= l.mass1 + l.mass2 < mhigh)]
elif bin_type == "Component_Mass":
# here it is assumed that m2 is fixed
newinjs = [l for l in injs if (mlow <= l.mass1 < mhigh)]
elif bin_type == "BNS_BBH":
if bin_num in [0, 2]:
# BNS/BBH case
newinjs = [l for l in injs if
(mlow <= l.mass1 < mhigh and mlow <= l.mass2 < mhigh)]
else:
# NSBH
newinjs = [l for l in injs if (mbins[0] <= l.mass1 < mbins[1] and
mbins[2] <= l.mass2 < mbins[3])]
# BHNS
newinjs += [l for l in injs if (mbins[0] <= l.mass2 < mbins[1] and
mbins[2] <= l.mass1 < mbins[3])]
return newinjs
[docs]
def compute_volume_vs_mass(found, missed, mass_bins, bin_type, dbins=None):
"""
Compute the average luminosity an experiment was sensitive to
Assumes that luminosity is uniformly distributed in space.
Input is the sets of found and missed injections.
"""
# mean and std estimate for luminosity
volArray = bin_utils.BinnedArray(mass_bins)
vol2Array = bin_utils.BinnedArray(mass_bins)
# found/missed stats
foundArray = bin_utils.BinnedArray(mass_bins)
missedArray = bin_utils.BinnedArray(mass_bins)
# compute the mean luminosity in each mass bin
effvmass = []
errvmass = []
# 2D case first
if bin_type == "Mass1_Mass2":
for j, mc1 in enumerate(mass_bins.centres()[0]):
for k, mc2 in enumerate(mass_bins.centres()[1]):
newfound = filter_injections_by_mass(
found, mass_bins, j, bin_type, k)
newmissed = filter_injections_by_mass(
missed, mass_bins, j, bin_type, k)
foundArray[(mc1, mc2)] = len(newfound)
missedArray[(mc1, mc2)] = len(newmissed)
# compute the volume using this injection set
meaneff, efferr, meanvol, volerr = mean_efficiency_volume(
newfound, newmissed, dbins)
effvmass.append(meaneff)
errvmass.append(efferr)
volArray[(mc1, mc2)] = meanvol
vol2Array[(mc1, mc2)] = volerr
return volArray, vol2Array, foundArray, missedArray, effvmass, errvmass
for j, mc in enumerate(mass_bins.centres()[0]):
# filter out injections not in this mass bin
newfound = filter_injections_by_mass(found, mass_bins, j, bin_type)
newmissed = filter_injections_by_mass(missed, mass_bins, j, bin_type)
foundArray[(mc, )] = len(newfound)
missedArray[(mc, )] = len(newmissed)
# compute the volume using this injection set
meaneff, efferr, meanvol, volerr = mean_efficiency_volume(
newfound, newmissed, dbins)
effvmass.append(meaneff)
errvmass.append(efferr)
volArray[(mc, )] = meanvol
vol2Array[(mc, )] = volerr
return volArray, vol2Array, foundArray, missedArray, effvmass, errvmass