Source code for pycbc.inference.io.epsie

# Copyright (C) 2019  Collin Capano
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""This module provides IO classes for epsie samplers.
"""


import numpy
from pickle import UnpicklingError
from epsie import load_state

from .base_sampler import BaseSamplerFile
from .base_mcmc import MCMCMetadataIO
from .base_multitemper import (CommonMultiTemperedMetadataIO,
                               write_samples,
                               read_raw_samples)


[docs]class EpsieFile(MCMCMetadataIO, CommonMultiTemperedMetadataIO, BaseSamplerFile): """Class to handle IO for Epsie's parallel-tempered sampler.""" name = 'epsie_file' @property def nchains(self): """Alias for nwalkers.""" return self.nwalkers @property def betas(self): """The betas that were used.""" return self[self.sampler_group]['betas'][()] @property def swap_interval(self): """The interval that temperature swaps occurred at.""" return self[self.sampler_group].attrs['swap_interval'] @swap_interval.setter def swap_interval(self, swap_interval): """Stores the swap interval to the sampler group's attrs.""" self[self.sampler_group].attrs['swap_interval'] = swap_interval @property def seed(self): """The sampler's seed.""" # convert seed from str back to int (see setter below for reason) return int(self[self.sampler_group].attrs['seed']) @seed.setter def seed(self, seed): """Store the sampler's seed.""" # epsie uses the numpy's new random generators, which use long integers # for seeds. hdf5 doesn't know how to handle long integers, so we'll # store it as a string self[self.sampler_group].attrs['seed'] = str(seed)
[docs] def write_sampler_metadata(self, sampler): """Adds writing seed and betas to MultiTemperedMCMCIO. """ super(EpsieFile, self).write_sampler_metadata(sampler) self.seed = sampler.seed self.write_data("betas", sampler.betas, path=self.sampler_group)
[docs] def thin(self, thin_interval): """Thins the samples on disk to the given thinning interval. Also thins the acceptance ratio and the temperature data, both of which are stored in the ``sampler_info`` group. """ # We'll need to know what the new interval to thin by will be # so we can properly thin the acceptance ratio and temperatures swaps. # We need to do this before calling the base thin, as we need to know # what the current thinned by is. new_interval = thin_interval // self.thinned_by # now thin the samples super(EpsieFile, self).thin(thin_interval) # thin the acceptance ratio self._thin_data(self.sampler_group, ['acceptance_ratio'], new_interval) # thin the temperature swaps; since these may not happen every # iteration, the thin interval we use for these is different ts_group = '/'.join([self.sampler_group, 'temperature_swaps']) ts_thin_interval = new_interval // self.swap_interval if ts_thin_interval > 1: self._thin_data(ts_group, ['swap_index'], ts_thin_interval) self._thin_data(ts_group, ['acceptance_ratio'], ts_thin_interval)
[docs] def write_samples(self, samples, **kwargs): r"""Writes samples to the given file. Calls :py:func:`base_multitemper.write_samples`. See that function for details. Parameters ---------- samples : dict The samples to write. Each array in the dictionary should have shape ntemps x nwalkers x niterations. \**kwargs : All other keyword arguments are passed to :py:func:`base_multitemper.write_samples`. """ write_samples(self, samples, **kwargs)
[docs] def read_raw_samples(self, fields, **kwargs): r"""Base function for reading samples. Calls :py:func:`base_multitemper.read_raw_samples`. See that function for details. Parameters ----------- fields : list The list of field names to retrieve. \**kwargs : All other keyword arguments are passed to :py:func:`base_multitemper.read_raw_samples`. Returns ------- dict A dictionary of field name -> numpy array pairs. """ return read_raw_samples(self, fields, **kwargs)
[docs] def write_acceptance_ratio(self, acceptance_ratio, last_iteration=None): """Writes the acceptance ratios to the sampler info group. Parameters ---------- acceptance_ratio : array The acceptance ratios to write. Should have shape ``ntemps x nchains x niterations``. """ # we'll use the write_samples machinery to write the acceptance ratios self.write_samples({'acceptance_ratio': acceptance_ratio}, last_iteration=last_iteration, samples_group=self.sampler_group)
[docs] def read_acceptance_ratio(self, temps=None, chains=None): """Reads the acceptance ratios. Ratios larger than 1 are set back to 1 before returning. Parameters ----------- temps : (list of) int, optional The temperature index (or a list of indices) to retrieve. If None, acceptance ratios from all temperatures and all chains will be retrieved. chains : (list of) int, optional The chain index (or a list of indices) to retrieve. If None, ratios from all chains will be obtained. Returns ------- array Array of acceptance ratios with shape (requested temps, requested chains, niterations). """ group = self.sampler_group + '/acceptance_ratio' if chains is None: wmask = numpy.ones(self.nchains, dtype=bool) else: wmask = numpy.zeros(self.nchains, dtype=bool) wmask[chains] = True if temps is None: tmask = numpy.ones(self.ntemps, dtype=bool) else: tmask = numpy.zeros(self.ntemps, dtype=bool) tmask[temps] = True all_ratios = self[group][:] # make sure values > 1 are set back to 1 all_ratios[all_ratios > 1] = 1. return all_ratios[numpy.ix_(tmask, wmask)]
[docs] def read_acceptance_rate(self, temps=None, chains=None): """Reads the acceptance rate. This calls :py:func:`read_acceptance_ratio`, then averages the ratios over all iterations to get the average rate. Parameters ----------- temps : (list of) int, optional The temperature index (or a list of indices) to retrieve. If None, acceptance rates from all temperatures and all chains will be retrieved. chains : (list of) int, optional The chain index (or a list of indices) to retrieve. If None, rates from all chains will be obtained. Returns ------- array Array of acceptance ratios with shape (requested temps, requested chains). """ all_ratios = self.read_acceptance_ratio(temps, chains) # average over the number of iterations all_ratios = all_ratios.mean(axis=-1) return all_ratios
[docs] def read_acceptance_fraction(self, temps=None, walkers=None): """Alias for :py:func:`read_acceptance_rate`. """ return self.read_acceptance_rate(temps=temps, chains=walkers)
[docs] def write_temperature_data(self, swap_index, acceptance_ratio, swap_interval, last_iteration): """Writes temperature swaps and acceptance ratios. Parameters ---------- swap_index : array The indices indicating which temperatures were swapped. Should have shape ``ntemps x nchains x (niterations/swap_interval)``. acceptance_ratio : array The array of acceptance ratios between temperatures. Should have shape ``(ntemps-1) x nchains x (niterations/swap_interval)``. arrays. swap_interval : int The number of iterations between temperature swaps. last_iteration : int The iteration of the last sample. """ self.swap_interval = swap_interval group = '/'.join([self.sampler_group, 'temperature_swaps']) # we'll use the write_samples machinery to write the acceptance ratios; # if temperature swaps didn't happen every iteration, then a smaller # thinning interval than what is used for the samples should be used thin_by = self.thinned_by // swap_interval # we'll also tell the write samples that the last "iteration" is the # last iteration / the swap interval, to get the spacing correct last_iteration = last_iteration // swap_interval # we need to write the two arrays separately, since they have different # dimensions in temperature self.write_samples({'swap_index': swap_index}, last_iteration=last_iteration, samples_group=group, thin_by=thin_by) self.write_samples({'acceptance_ratio': acceptance_ratio}, last_iteration=last_iteration, samples_group=group, thin_by=thin_by)
[docs] def validate(self): """Adds attemp to load checkpoint to validation test.""" valid = super(EpsieFile, self).validate() # try to load the checkpoint if valid: try: load_state(self, self.sampler_group) except (KeyError, UnpicklingError): # will get this if the state wasn't written, or it was # corrupted for some reason valid = False return valid
@staticmethod def _get_optional_args(args, opts, err_on_missing=False, **kwargs): # need this to make sure options called "walkers" are renamed to # "chains" parsed = BaseSamplerFile._get_optional_args( args, opts, err_on_missing=err_on_missing, **kwargs) try: chains = parsed.pop('walkers') parsed['chains'] = chains except KeyError: pass return parsed