# Copyright (C) 2019 Collin Capano, Sumit Kumar, Prayush Kumar
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
"""
This modules provides classes and functions for using the dynesty sampler
packages for parameter estimation.
"""
import logging
import time
import numpy
import dynesty, dynesty.dynesty, dynesty.nestedsamplers
from pycbc.pool import choose_pool
from dynesty import utils as dyfunc
from pycbc.inference.io import (DynestyFile, validate_checkpoint_files,
loadfile)
from .base import (BaseSampler, setup_output)
from .base_mcmc import get_optional_arg_from_config
from .base_cube import setup_calls
from .. import models
#
# =============================================================================
#
# Samplers
#
# =============================================================================
#
[docs]
class DynestySampler(BaseSampler):
"""This class is used to construct an Dynesty sampler from the dynesty
package.
Parameters
----------
model : model
A model from ``pycbc.inference.models``.
nlive : int
Number of live points to use in sampler.
pool : function with map, Optional
A provider of a map function that allows a function call to be run
over multiple sets of arguments and possibly maps them to
cores/nodes/etc.
"""
name = "dynesty"
_io = DynestyFile
def __init__(self, model, nlive, nprocesses=1,
checkpoint_time_interval=None, maxcall=None,
loglikelihood_function=None, use_mpi=False,
no_save_state=False,
run_kwds=None,
extra_kwds=None,
internal_kwds=None,
**kwargs):
self.model = model
self.no_save_state = no_save_state
log_likelihood_call, prior_call = setup_calls(
model,
loglikelihood_function=loglikelihood_function,
copy_prior=True)
# Set up the pool
self.pool = choose_pool(mpi=use_mpi, processes=nprocesses)
self.maxcall = maxcall
self.checkpoint_time_interval = checkpoint_time_interval
self.run_kwds = {} if run_kwds is None else run_kwds
self.extra_kwds = {} if extra_kwds is None else extra_kwds
self.internal_kwds = {} if internal_kwds is None else internal_kwds
self.nlive = nlive
self.names = model.sampling_params
self.ndim = len(model.sampling_params)
self.checkpoint_file = None
# Enable checkpointing if checkpoint_time_interval is set in config
# file in sampler section
if self.checkpoint_time_interval:
self.run_with_checkpoint = True
if self.maxcall is None:
self.maxcall = 5000 * self.pool.size
logging.info("Checkpointing enabled, will verify every %s calls"
" and try to checkpoint every %s seconds",
self.maxcall, self.checkpoint_time_interval)
else:
self.run_with_checkpoint = False
# Check for cyclic boundaries
periodic = []
cyclic = self.model.prior_distribution.cyclic
for i, param in enumerate(self.variable_params):
if param in cyclic:
logging.info('Param: %s will be cyclic', param)
periodic.append(i)
if len(periodic) == 0:
periodic = None
# Check for reflected boundaries. Dynesty only supports
# reflection on both min and max of boundary.
reflective = []
reflect = self.model.prior_distribution.well_reflected
for i, param in enumerate(self.variable_params):
if param in reflect:
logging.info("Param: %s will be well reflected", param)
reflective.append(i)
if len(reflective) == 0:
reflective = None
if 'sample' in extra_kwds:
if 'rwalk2' in extra_kwds['sample']:
dynesty.dynesty._SAMPLING["rwalk"] = sample_rwalk_mod
dynesty.nestedsamplers._SAMPLING["rwalk"] = sample_rwalk_mod
extra_kwds['sample'] = 'rwalk'
if self.nlive < 0:
# Interpret a negative input value for the number of live points
# (which is clearly an invalid input in all senses)
# as the desire to dynamically determine that number
self._sampler = dynesty.DynamicNestedSampler(log_likelihood_call,
prior_call, self.ndim,
pool=self.pool,
reflective=reflective,
periodic=periodic,
**extra_kwds)
self.run_with_checkpoint = False
logging.info("Checkpointing not currently supported with"
"DYNAMIC nested sampler")
else:
self._sampler = dynesty.NestedSampler(log_likelihood_call,
prior_call, self.ndim,
nlive=self.nlive,
reflective=reflective,
periodic=periodic,
pool=self.pool, **extra_kwds)
self._sampler.kwargs.update(internal_kwds)
# properties of the internal sampler which should not be pickled
self.no_pickle = ['loglikelihood',
'prior_transform',
'propose_point',
'update_proposal',
'_UPDATE', '_PROPOSE',
'evolve_point', 'use_pool', 'queue_size',
'use_pool_ptform', 'use_pool_logl',
'use_pool_evolve', 'use_pool_update',
'pool', 'M']
[docs]
def run(self):
diff_niter = 1
if self.run_with_checkpoint is True:
n_checkpointing = 1
t0 = time.time()
it = self._sampler.it
logging.info('Starting from iteration: %s', it)
while diff_niter != 0:
self._sampler.run_nested(maxcall=self.maxcall, **self.run_kwds)
delta_t = time.time() - t0
diff_niter = self._sampler.it - it
logging.info("Checking if we should checkpoint: %.2f s", delta_t)
if delta_t >= self.checkpoint_time_interval:
logging.info('Checkpointing N={}'.format(n_checkpointing))
self.checkpoint()
n_checkpointing += 1
t0 = time.time()
it = self._sampler.it
else:
self._sampler.run_nested(**self.run_kwds)
@property
def io(self):
return self._io
@property
def niterations(self):
return len(tuple(self.samples.values())[0])
[docs]
@classmethod
def from_config(cls, cp, model, output_file=None, nprocesses=1,
use_mpi=False, loglikelihood_function=None):
"""Loads the sampler from the given config file. Many options are
directly passed to the underlying dynesty sampler, see the official
dynesty documentation for more details on these.
The following options are retrieved in the ``[sampler]`` section:
* ``name = STR``:
Required. This must match the sampler's name.
* ``maxiter = INT``:
The maximum number of iterations to run.
* ``dlogz = FLOAT``:
The target dlogz stopping condition.
* ``logl_max = FLOAT``:
The maximum logl stopping condition.
* ``n_effective = INT``:
Target effective number of samples stopping condition
* ``sample = STR``:
The method to sample the space. Should be one of 'uniform',
'rwalk', 'rwalk2' (a modified version of rwalk), or 'slice'.
* ``walk = INT``:
Used for some of the walk methods. Sets the minimum number of
steps to take when evolving a point.
* ``maxmcmc = INT``:
Used for some of the walk methods. Sets the maximum number of steps
to take when evolving a point.
* ``nact = INT``:
used for some of the walk methods. Sets number of autorcorrelation
lengths before terminating evolution of a point.
* ``first_update_min_ncall = INT``:
The minimum number of calls before updating the bounding region
for the first time.
* ``first_update_min_neff = FLOAT``:
Don't update the the bounding region untill the efficiency drops
below this value.
* ``bound = STR``:
The method of bounding of the prior volume.
Should be one of 'single', 'balls', 'cubes', 'multi' or 'none'.
* ``update_interval = INT``:
Number of iterations between updating the bounding regions
* ``enlarge = FLOAT``:
Factor to enlarge the bonding region.
* ``bootstrap = INT``:
The number of bootstrap iterations to determine the enlargement
factor.
* ``maxcall = INT``:
The maximum number of calls before checking if we should checkpoint
* ``checkpoint_time_interval``:
Sets the time in seconds between checkpointing.
* ``loglikelihood-function``:
The attribute of the model to use for the loglikelihood. If
not provided, will default to ``loglikelihood``.
Parameters
----------
cp : WorkflowConfigParser instance
Config file object to parse.
model : pycbc.inference.model.BaseModel instance
The model to use.
output_file : str, optional
The name of the output file to checkpoint and write results to.
nprocesses : int, optional
The number of parallel processes to use. Default is 1.
use_mpi : bool, optional
Use MPI for parallelization. Default is False.
Returns
-------
DynestySampler :
The sampler instance.
"""
section = "sampler"
# check name
assert cp.get(section, "name") == cls.name, (
"name in section [sampler] must match mine")
# get the number of live points to use
nlive = int(cp.get(section, "nlive"))
loglikelihood_function = \
get_optional_arg_from_config(cp, section, 'loglikelihood-function')
no_save_state = cp.has_option(section, 'no-save-state')
# optional run_nested arguments for dynesty
rargs = {'maxiter': int,
'dlogz': float,
'logl_max': float,
'n_effective': int,
}
# optional arguments for dynesty
cargs = {'bound': str,
'bootstrap': int,
'enlarge': float,
'update_interval': float,
'sample': str,
'first_update_min_ncall': int,
'first_update_min_eff': float,
'walks': int,
}
# optional arguments that must be set internally
internal_args = {
'maxmcmc': int,
'nact': int,
}
extra = {}
run_extra = {}
internal_extra = {}
for args, argt in [(extra, cargs),
(run_extra, rargs),
(internal_extra, internal_args),
]:
for karg in argt:
if cp.has_option(section, karg):
args[karg] = argt[karg](cp.get(section, karg))
#This arg needs to be a dict
first_update = {}
if 'first_update_min_ncall' in extra:
first_update['min_ncall'] = extra.pop('first_update_min_ncall')
logging.info('First update: min_ncall:%s',
first_update['min_ncall'])
if 'first_update_min_eff' in extra:
first_update['min_eff'] = extra.pop('first_update_min_eff')
logging.info('First update: min_eff:%s', first_update['min_eff'])
extra['first_update'] = first_update
# populate options for checkpointing
checkpoint_time_interval = None
maxcall = None
if cp.has_option(section, 'checkpoint_time_interval'):
ck_time = float(cp.get(section, 'checkpoint_time_interval'))
checkpoint_time_interval = ck_time
if cp.has_option(section, 'maxcall'):
maxcall = int(cp.get(section, 'maxcall'))
obj = cls(model, nlive=nlive, nprocesses=nprocesses,
loglikelihood_function=loglikelihood_function,
checkpoint_time_interval=checkpoint_time_interval,
maxcall=maxcall,
no_save_state=no_save_state,
use_mpi=use_mpi, run_kwds=run_extra,
extra_kwds=extra,
internal_kwds=internal_extra,)
setup_output(obj, output_file, check_nsamples=False)
if not obj.new_checkpoint:
obj.resume_from_checkpoint()
return obj
[docs]
def checkpoint(self):
"""Checkpoint function for dynesty sampler
"""
# Dynesty has its own __getstate__ which deletes
# random state information and the pool
saved = {}
for key in self.no_pickle:
if hasattr(self._sampler, key):
saved[key] = getattr(self._sampler, key)
setattr(self._sampler, key, None)
for fn in [self.checkpoint_file, self.backup_file]:
with self.io(fn, "a") as fp:
# Write random state
fp.write_random_state()
# Write pickled data
fp.write_pickled_data_into_checkpoint_file(self._sampler)
self.write_results(fn)
# Restore properties that couldn't be pickled if we are continuing
for key in saved:
setattr(self._sampler, key, saved[key])
[docs]
def resume_from_checkpoint(self):
try:
with loadfile(self.checkpoint_file, 'r') as fp:
sampler = fp.read_pickled_data_from_checkpoint_file()
for key in sampler.__dict__:
if key not in self.no_pickle:
value = getattr(sampler, key)
setattr(self._sampler, key, value)
self.set_state_from_file(self.checkpoint_file)
logging.info("Found valid checkpoint file: %s",
self.checkpoint_file)
except Exception as e:
print(e)
logging.info("Failed to load checkpoint file")
[docs]
def set_state_from_file(self, filename):
"""Sets the state of the sampler back to the instance saved in a file.
"""
with self.io(filename, 'r') as fp:
state = fp.read_random_state()
# Dynesty handles most randomeness through rstate which is
# pickled along with the class instance
numpy.random.set_state(state)
[docs]
def finalize(self):
"""Finalze and write it to the results file
"""
logz = self._sampler.results.logz[-1:][0]
dlogz = self._sampler.results.logzerr[-1:][0]
logging.info("log Z, dlog Z: {}, {}".format(logz, dlogz))
if self.no_save_state:
self.write_results(self.checkpoint_file)
else:
self.checkpoint()
logging.info("Validating checkpoint and backup files")
checkpoint_valid = validate_checkpoint_files(
self.checkpoint_file, self.backup_file, check_nsamples=False)
if not checkpoint_valid:
raise IOError("error writing to checkpoint file")
@property
def samples(self):
"""Returns raw nested samples
"""
results = self._sampler.results
samples = results.samples
nest_samp = {}
for i, param in enumerate(self.variable_params):
nest_samp[param] = samples[:, i]
nest_samp['logwt'] = results.logwt
nest_samp['loglikelihood'] = results.logl
return nest_samp
[docs]
def set_initial_conditions(self, initial_distribution=None,
samples_file=None):
"""Sets up the starting point for the sampler.
Should also set the sampler's random state.
"""
pass
[docs]
def write_results(self, filename):
"""Writes samples, model stats, acceptance fraction, and random state
to the given file.
Parameters
-----------
filename : str
The file to write to. The file is opened using the ``io`` class
in an an append state.
"""
with self.io(filename, 'a') as fp:
# Write nested samples
fp.write_raw_samples(self.samples)
# Write logz and dlogz
logz = self._sampler.results.logz[-1:][0]
dlogz = self._sampler.results.logzerr[-1:][0]
fp.write_logevidence(logz, dlogz)
@property
def model_stats(self):
pass
@property
def logz(self):
"""
return bayesian evidence estimated by
dynesty sampler
"""
return self._sampler.results.logz[-1:][0]
@property
def logz_err(self):
"""
return error in bayesian evidence estimated by
dynesty sampler
"""
return self._sampler.results.logzerr[-1:][0]
[docs]
def sample_rwalk_mod(args):
""" Modified version of dynesty.sampling.sample_rwalk
Adapted from version used in bilby/dynesty
"""
try:
# dynesty <= 1.1
from dynesty.utils import unitcheck, reflect
# Unzipping.
(u, loglstar, axes, scale,
prior_transform, loglikelihood, kwargs) = args
except ImportError:
# dynest >= 1.2
from dynesty.utils import unitcheck, apply_reflect as reflect
(u, loglstar, axes, scale,
prior_transform, loglikelihood, _, kwargs) = args
rstate = numpy.random
# Bounds
nonbounded = kwargs.get('nonbounded', None)
periodic = kwargs.get('periodic', None)
reflective = kwargs.get('reflective', None)
# Setup.
n = len(u)
walks = kwargs.get('walks', 10 * n) # minimum number of steps
maxmcmc = kwargs.get('maxmcmc', 2000) # Maximum number of steps
nact = kwargs.get('nact', 5) # Number of ACT
old_act = kwargs.get('old_act', walks)
# Initialize internal variables
accept = 0
reject = 0
nfail = 0
act = numpy.inf
u_list = []
v_list = []
logl_list = []
ii = 0
while ii < nact * act:
ii += 1
# Propose a direction on the unit n-sphere.
drhat = rstate.randn(n)
drhat /= numpy.linalg.norm(drhat)
# Scale based on dimensionality.
dr = drhat * rstate.rand() ** (1.0 / n)
# Transform to proposal distribution.
du = numpy.dot(axes, dr)
u_prop = u + scale * du
# Wrap periodic parameters
if periodic is not None:
u_prop[periodic] = numpy.mod(u_prop[periodic], 1)
# Reflect
if reflective is not None:
u_prop[reflective] = reflect(u_prop[reflective])
# Check unit cube constraints.
if u.max() < 0:
break
if unitcheck(u_prop, nonbounded):
pass
else:
nfail += 1
# Only start appending to the chain once a single jump is made
if accept > 0:
u_list.append(u_list[-1])
v_list.append(v_list[-1])
logl_list.append(logl_list[-1])
continue
# Check proposed point.
v_prop = prior_transform(numpy.array(u_prop))
logl_prop = loglikelihood(numpy.array(v_prop))
if logl_prop > loglstar:
u = u_prop
v = v_prop
logl = logl_prop
accept += 1
u_list.append(u)
v_list.append(v)
logl_list.append(logl)
else:
reject += 1
# Only start appending to the chain once a single jump is made
if accept > 0:
u_list.append(u_list[-1])
v_list.append(v_list[-1])
logl_list.append(logl_list[-1])
# If we've taken the minimum number of steps, calculate the ACT
if accept + reject > walks:
act = estimate_nmcmc(
accept_ratio=accept / (accept + reject + nfail),
old_act=old_act, maxmcmc=maxmcmc)
# If we've taken too many likelihood evaluations then break
if accept + reject > maxmcmc:
logging.warning(
"Hit maximum number of walks {} with accept={}, reject={}, "
"and nfail={} try increasing maxmcmc"
.format(maxmcmc, accept, reject, nfail))
break
# If the act is finite, pick randomly from within the chain
if numpy.isfinite(act) and int(.5 * nact * act) < len(u_list):
idx = numpy.random.randint(int(.5 * nact * act), len(u_list))
u = u_list[idx]
v = v_list[idx]
logl = logl_list[idx]
else:
logging.debug("Unable to find a new point using walk: "
"returning a random point")
u = numpy.random.uniform(size=n)
v = prior_transform(u)
logl = loglikelihood(v)
blob = {'accept': accept, 'reject': reject, 'fail': nfail, 'scale': scale}
kwargs["old_act"] = act
ncall = accept + reject
return u, v, logl, ncall, blob
[docs]
def estimate_nmcmc(accept_ratio, old_act, maxmcmc, safety=5, tau=None):
"""Estimate autocorrelation length of chain using acceptance fraction
Using ACL = (2/acc) - 1 multiplied by a safety margin. Code adapated from
CPNest:
* https://github.com/johnveitch/cpnest/blob/master/cpnest/sampler.py
* https://github.com/farr/Ensemble.jl
Parameters
----------
accept_ratio: float [0, 1]
Ratio of the number of accepted points to the total number of points
old_act: int
The ACT of the last iteration
maxmcmc: int
The maximum length of the MCMC chain to use
safety: int
A safety factor applied in the calculation
tau: int (optional)
The ACT, if given, otherwise estimated.
"""
if tau is None:
tau = maxmcmc / safety
if accept_ratio == 0.0:
Nmcmc_exact = (1 + 1 / tau) * old_act
else:
Nmcmc_exact = (
(1. - 1. / tau) * old_act +
(safety / tau) * (2. / accept_ratio - 1.)
)
Nmcmc_exact = float(min(Nmcmc_exact, maxmcmc))
return max(safety, int(Nmcmc_exact))