# Copyright (C) 2018 Collin Capano
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""I/O utilities for pycbc inference
"""
import os
import argparse
import shutil
import textwrap
import numpy
import logging
import h5py as _h5py
from pycbc.io.record import (FieldArray, _numpy_function_lib)
from pycbc import waveform as _waveform
from pycbc.io.hdf import (dump_state, load_state)
from pycbc.inference.option_utils import (ParseLabelArg, ParseParametersArg)
from .emcee import EmceeFile
from .emcee_pt import EmceePTFile
from .ptemcee import PTEmceeFile
from .cpnest import CPNestFile
from .multinest import MultinestFile
from .dynesty import DynestyFile
from .ultranest import UltranestFile
from .snowline import SnowlineFile
from .nessai import NessaiFile
from .posterior import PosteriorFile
from .txt import InferenceTXTFile
filetypes = {
EmceeFile.name: EmceeFile,
EmceePTFile.name: EmceePTFile,
PTEmceeFile.name: PTEmceeFile,
CPNestFile.name: CPNestFile,
MultinestFile.name: MultinestFile,
DynestyFile.name: DynestyFile,
PosteriorFile.name: PosteriorFile,
UltranestFile.name: UltranestFile,
NessaiFile.name: NessaiFile,
SnowlineFile.name: SnowlineFile,
}
try:
from .epsie import EpsieFile
filetypes[EpsieFile.name] = EpsieFile
except ImportError:
pass
[docs]
def get_file_type(filename):
""" Returns I/O object to use for file.
Parameters
----------
filename : str
Name of file.
Returns
-------
file_type : {InferenceFile, InferenceTXTFile}
The type of inference file object to use.
"""
txt_extensions = [".txt", ".dat", ".csv"]
hdf_extensions = [".hdf", ".h5", ".bkup", ".checkpoint"]
for ext in hdf_extensions:
if filename.endswith(ext):
with _h5py.File(filename, 'r') as fp:
filetype = fp.attrs['filetype']
try:
filetype = str(filetype.decode())
except AttributeError:
pass
return filetypes[filetype]
for ext in txt_extensions:
if filename.endswith(ext):
return InferenceTXTFile
raise TypeError("Extension is not supported.")
[docs]
def loadfile(path, mode=None, filetype=None, **kwargs):
"""Loads the given file using the appropriate InferenceFile class.
If ``filetype`` is not provided, this will try to retreive the ``filetype``
from the file's ``attrs``. If the file does not exist yet, an IOError will
be raised if ``filetype`` is not provided.
Parameters
----------
path : str
The filename to load.
mode : str, optional
What mode to load the file with, e.g., 'w' for write, 'r' for read,
'a' for append. Default will default to h5py.File's mode, which is 'a'.
filetype : str, optional
Force the file to be loaded with the given class name. This must be
provided if creating a new file.
Returns
-------
filetype instance
An open file handler to the file. The class used for IO with the file
is determined by the ``filetype`` keyword (if provided) or the
``filetype`` stored in the file (if not provided).
"""
if filetype is None:
# try to read the file to get its filetype
try:
fileclass = get_file_type(path)
except IOError:
# file doesn't exist, filetype must be provided
raise IOError("The file appears not to exist. In this case, "
"filetype must be provided.")
else:
fileclass = filetypes[filetype]
return fileclass(path, mode=mode, **kwargs)
#
# =============================================================================
#
# HDF Utilities
#
# =============================================================================
#
[docs]
def check_integrity(filename):
"""Checks the integrity of an InferenceFile.
Checks done are:
* can the file open?
* do all of the datasets in the samples group have the same shape?
* can the first and last sample in all of the datasets in the samples
group be read?
If any of these checks fail, an IOError is raised.
Parameters
----------
filename: str
Name of an InferenceFile to check.
Raises
------
ValueError
If the given file does not exist.
KeyError
If the samples group does not exist.
IOError
If any of the checks fail.
"""
# check that the file exists
if not os.path.exists(filename):
raise ValueError("file {} does not exist".format(filename))
# if the file is corrupted such that it cannot be opened, the next line
# will raise an IOError
with loadfile(filename, 'r') as fp:
# check that all datasets in samples have the same shape
parameters = list(fp[fp.samples_group].keys())
# but only do the check if parameters have been written
if len(parameters) > 0:
group = fp.samples_group + '/{}'
# use the first parameter as a reference shape
ref_shape = fp[group.format(parameters[0])].shape
if not all(fp[group.format(param)].shape == ref_shape
for param in parameters):
raise IOError("not all datasets in the samples group have "
"the same shape")
# check that we can read the first/last sample
firstidx = tuple([0]*len(ref_shape))
lastidx = tuple([-1]*len(ref_shape))
for param in parameters:
_ = fp[group.format(param)][firstidx]
_ = fp[group.format(param)][lastidx]
[docs]
def validate_checkpoint_files(checkpoint_file, backup_file,
check_nsamples=True):
"""Checks if the given checkpoint and/or backup files are valid.
The checkpoint file is considered valid if:
* it passes all tests run by ``check_integrity``;
* it has at least one sample written to it (indicating at least one
checkpoint has happened).
The same applies to the backup file. The backup file must also have the
same number of samples as the checkpoint file, otherwise, the backup is
considered invalid.
If the checkpoint (backup) file is found to be valid, but the backup
(checkpoint) file is not valid, then the checkpoint (backup) is copied to
the backup (checkpoint). Thus, this function ensures that checkpoint and
backup files are either both valid or both invalid.
Parameters
----------
checkpoint_file : string
Name of the checkpoint file.
backup_file : string
Name of the backup file.
Returns
-------
checkpoint_valid : bool
Whether or not the checkpoint (and backup) file may be used for loading
samples.
"""
# check if checkpoint file exists and is valid
try:
check_integrity(checkpoint_file)
checkpoint_valid = True
except (ValueError, KeyError, IOError):
checkpoint_valid = False
# backup file
try:
check_integrity(backup_file)
backup_valid = True
except (ValueError, KeyError, IOError):
backup_valid = False
# since we can open the file, run self diagnostics
if checkpoint_valid:
with loadfile(checkpoint_file, 'r') as fp:
checkpoint_valid = fp.validate()
if backup_valid:
with loadfile(backup_file, 'r') as fp:
backup_valid = fp.validate()
if check_nsamples:
# This check is not required by nested samplers
# check that the checkpoint and backup have the same number of samples;
# if not, assume the checkpoint has the correct number
if checkpoint_valid and backup_valid:
with loadfile(checkpoint_file, 'r') as fp:
group = list(fp[fp.samples_group].keys())[0]
nsamples = fp[fp.samples_group][group].size
with loadfile(backup_file, 'r') as fp:
group = list(fp[fp.samples_group].keys())[0]
backup_nsamples = fp[fp.samples_group][group].size
backup_valid = nsamples == backup_nsamples
# decide what to do based on the files' statuses
if checkpoint_valid and not backup_valid:
# copy the checkpoint to the backup
logging.info("Backup invalid; copying checkpoint file")
shutil.copy(checkpoint_file, backup_file)
backup_valid = True
elif backup_valid and not checkpoint_valid:
logging.info("Checkpoint invalid; copying backup file")
# copy the backup to the checkpoint
shutil.copy(backup_file, checkpoint_file)
checkpoint_valid = True
return checkpoint_valid
#
# =============================================================================
#
# Command-line Utilities
#
# =============================================================================
#
[docs]
def get_common_parameters(input_files, collection=None):
"""Gets a list of variable params that are common across all input files.
If no common parameters are found, a ``ValueError`` is raised.
Parameters
----------
input_files : list of str
List of input files to load.
collection : str, optional
What group of parameters to load. Can be the name of a list of
parameters stored in the files' attrs (e.g., "variable_params"), or
"all". If "all", will load all of the parameters in the files'
samples group. Default is to load all.
Returns
-------
list :
List of the parameter names.
"""
if collection is None:
collection = "all"
parameters = []
for fn in input_files:
fp = loadfile(fn, 'r')
if collection == 'all':
ps = fp[fp.samples_group].keys()
else:
ps = fp.attrs[collection]
parameters.append(set(ps))
fp.close()
parameters = list(set.intersection(*parameters))
if parameters == []:
raise ValueError("no common parameters found for collection {} in "
"files {}".format(collection, ', '.join(input_files)))
# if using python 3 to read a file created in python 2, need to convert
# parameters to strs
try:
parameters = [p.decode() for p in parameters]
except AttributeError:
pass
return parameters
[docs]
class PrintFileParams(argparse.Action):
"""Argparse action that will load input files and print possible parameters
to screen. Once this is done, the program is forced to exit immediately.
The behvior is similar to --help, except that the input-file is read.
.. note::
The ``input_file`` attribute must be set in the parser namespace before
this action is called. Otherwise, a ``NoInputFileError`` is raised.
"""
def __init__(self, skip_args=None, nargs=0, **kwargs):
if nargs != 0:
raise ValueError("nargs for this action must be 0")
super(PrintFileParams, self).__init__(nargs=nargs, **kwargs)
self.skip_args = skip_args
def __call__(self, parser, namespace, values, option_string=None):
# get the input file(s)
input_files = namespace.input_file
if input_files is None:
# see if we should raise an error
try:
raise_err = not parser.no_input_file_err
except AttributeError:
raise_err = True
if raise_err:
raise NoInputFileError("must provide at least one input file")
else:
# just return to stop further processing
return
filesbytype = {}
fileparsers = {}
for fn in input_files:
fp = loadfile(fn, 'r')
try:
filesbytype[fp.name].append(fn)
except KeyError:
filesbytype[fp.name] = [fn]
# get any extra options
fileparsers[fp.name], _ = fp.extra_args_parser(
skip_args=self.skip_args, add_help=False)
fp.close()
# now print information about the intersection of all parameters
parameters = get_common_parameters(input_files, collection='all')
print("\n"+textwrap.fill("Parameters available with this (these) "
"input file(s):"), end="\n\n")
print(textwrap.fill(' '.join(sorted(parameters))),
end="\n\n")
# information about the pycbc functions
pfuncs = sorted(FieldArray.functionlib.fget(FieldArray).keys())
print(textwrap.fill("Available pycbc functions (see "
"http://pycbc.org/pycbc/latest/html for "
"more details):"), end="\n\n")
print(textwrap.fill(', '.join(pfuncs)), end="\n\n")
# numpy funcs
npfuncs = sorted([name for (name, obj) in _numpy_function_lib.items()
if isinstance(obj, numpy.ufunc)])
print(textwrap.fill("Available numpy functions:"),
end="\n\n")
print(textwrap.fill(', '.join(npfuncs)), end="\n\n")
# misc
consts = "e euler_gamma inf nan pi"
print(textwrap.fill("Recognized constants:"),
end="\n\n")
print(consts, end="\n\n")
print(textwrap.fill("Python arthimetic (+ - * / // ** %), "
"binary (&, |, etc.), and comparison (>, <, >=, "
"etc.) operators may also be used."), end="\n\n")
# print out the extra arguments that may be used
outstr = textwrap.fill("The following are additional command-line "
"options that may be provided, along with the "
"input files that understand them:")
print("\n"+outstr, end="\n\n")
for ftype, fparser in fileparsers.items():
fnames = ', '.join(filesbytype[ftype])
if fparser is None:
outstr = textwrap.fill(
"File(s) {} use no additional options.".format(fnames))
print(outstr, end="\n\n")
else:
fparser.usage = fnames
fparser.print_help()
parser.exit(0)
[docs]
class ResultsArgumentParser(argparse.ArgumentParser):
"""Wraps argument parser, and preloads arguments needed for loading samples
from a file.
This parser class should be used by any program that wishes to use the
standard arguments for loading samples. It provides functionality to parse
file specific options. These file-specific arguments are not included in
the standard ``--help`` (since they depend on what input files are given),
but can be seen by running ``--file-help/-H``. The ``--file-help`` will
also print off information about what parameters may be used given the
input files.
As with the standard ``ArgumentParser``, running this class's
``parse_args`` will result in an error if arguments are provided that are
not recognized by the parser, nor by any of the file-specific arguments.
For example, ``parse_args`` would work on the command
``--input-file results.hdf --walker 0`` if
``results.hdf`` was created by a sampler that recognizes a ``--walker``
argument, but would raise an error if ``results.hdf`` was created by a
sampler that does not recognize a ``--walker`` argument. The extra
arguments that are recognized are determined by the sampler IO class's
``extra_args_parser``.
Some arguments may be excluded from the parser using the ``skip_args``
optional parameter.
Parameters
----------
skip_args : list of str, optional
Do not add the given arguments to the parser. Arguments should be
specified as the option string minus the leading '--'; e.g.,
``skip_args=['thin-start']`` would cause the ``thin-start`` argument
to not be included. May also specify sampler-specific arguments. Note
that ``input-file``, ``file-help``, and ``parameters`` are always
added.
defaultparams : {'variable_params', 'all'}, optional
If no ``--parameters`` provided, which collection of parameters to
load. If 'all' will load all parameters in the file's
``samples_group``. If 'variable_params' or None (the default) will load
the variable parameters.
autoparamlabels : bool, optional
Passed to ``add_results_option_group``; see that function for details.
\**kwargs :
All other keyword arguments are passed to ``argparse.ArgumentParser``.
"""
def __init__(self, skip_args=None, defaultparams=None,
autoparamlabels=True, **kwargs):
super(ResultsArgumentParser, self).__init__(**kwargs)
# add attribute to communicate to arguments what to do when there is
# no input files
self.no_input_file_err = False
if skip_args is None:
skip_args = []
self.skip_args = skip_args
if defaultparams is None:
defaultparams = 'variable_params'
self.defaultparams = defaultparams
# add the results option grup
self.add_results_option_group(autoparamlabels=autoparamlabels)
@property
def actions(self):
"""Exposes the actions this parser can do as a dictionary.
The dictionary maps the ``dest`` to actions.
"""
return {act.dest: act for act in self._actions}
def _unset_required(self):
"""Convenience function to turn off required arguments for first parse.
"""
self._required_args = [act for act in self._actions if act.required]
for act in self._required_args:
act.required = False
def _reset_required(self):
"""Convenience function to turn required arguments back on.
"""
for act in self._required_args:
act.required = True
[docs]
def parse_known_args(self, args=None, namespace=None):
"""Parse args method to handle input-file dependent arguments."""
# run parse args once to make sure the name space is populated with
# the input files. We'll turn off raising NoInputFileErrors on this
# pass
self.no_input_file_err = True
self._unset_required()
opts, extra_opts = super(ResultsArgumentParser, self).parse_known_args(
args, namespace)
# now do it again
self.no_input_file_err = False
self._reset_required()
opts, extra_opts = super(ResultsArgumentParser, self).parse_known_args(
args, opts)
# populate the parameters option if it wasn't specified
if opts.parameters is None or opts.parameters == ['*']:
parameters = get_common_parameters(opts.input_file,
collection=self.defaultparams)
# now call parse parameters action to re-populate the namespace
self.actions['parameters'](self, opts, parameters)
# check if we're being greedy or not
elif '*' in opts.parameters:
# remove the * from the parameters and the labels
opts.parameters = [p for p in opts.parameters if p != '*']
opts.parameters_labels.pop('*', None)
# add the rest of the parameters not used
all_params = get_common_parameters(opts.input_file,
collection=self.defaultparams)
# extract the used parameters from the parameters option
used_params = FieldArray.parse_parameters(opts.parameters,
all_params)
add_params = set(all_params) - set(used_params)
# repopulate the name space with the additional parameters
if add_params:
opts.parameters += list(add_params)
# update the labels
opts.parameters_labels.update({p: p for p in add_params})
# parse the sampler-specific options and check for any unknowns
unknown = []
for fn in opts.input_file:
fp = loadfile(fn, 'r')
sampler_parser, _ = fp.extra_args_parser(skip_args=self.skip_args)
if sampler_parser is not None:
opts, still_unknown = sampler_parser.parse_known_args(
extra_opts, namespace=opts)
unknown.append(set(still_unknown))
# the intersection of the unknowns are options not understood by
# any of the files
if len(unknown) > 0:
unknown = set.intersection(*unknown)
return opts, list(unknown)
[docs]
def add_results_option_group(self, autoparamlabels=True):
"""Adds the options used to call pycbc.inference.io.results_from_cli
function to the parser.
These are options releated to loading the results from a run of
pycbc_inference, for purposes of plotting and/or creating tables.
Any argument strings included in the ``skip_args`` attribute will not
be added.
Parameters
----------
autoparamlabels : bool, optional
If True, the ``--parameters`` option will use labels from
``waveform.parameters`` if a parameter name is the same as a
parameter there. Otherwise, will just use whatever label is
provided. Default is True.
"""
results_reading_group = self.add_argument_group(
title="Arguments for loading results",
description="Additional, file-specific arguments may also be "
"provided, depending on what input-files are given. See "
"--file-help for details.")
results_reading_group.add_argument(
"--input-file", type=str, required=True, nargs="+",
action=ParseLabelArg, metavar='FILE[:LABEL]',
help="Path to input HDF file(s). A label may be specified for "
"each input file to use for plots when multiple files are "
"specified.")
# advanced help
results_reading_group.add_argument(
"-H", "--file-help",
action=PrintFileParams, skip_args=self.skip_args,
help="Based on the provided input-file(s), print all available "
"parameters that may be retrieved and all possible functions "
"on those parameters. Also print available additional "
"arguments that may be passed. This option is like an "
"advanced --help: if run, the program will just print the "
"information to screen, then exit.")
if autoparamlabels:
paramparser = ParseParametersArg
lblhelp = (
"If LABEL is the same as a parameter in "
"pycbc.waveform.parameters, the label "
"property of that parameter will be used (e.g., if LABEL "
"were 'mchirp' then {} would be used). "
.format(_waveform.parameters.mchirp.label))
else:
paramparser = ParseLabelArg
lblhelp = ''
results_reading_group.add_argument(
"--parameters", type=str, nargs="+", metavar="PARAM[:LABEL]",
action=paramparser,
help="Name of parameters to load; default is to load all. The "
"parameters can be any of the model params or posterior "
"stats (loglikelihood, logprior, etc.) in the input file(s), "
"derived parameters from them, or any function of them. If "
"multiple files are provided, any parameter common to all "
"files may be used. Syntax for functions is python; any math "
"functions in the numpy libary may be used. Can optionally "
"also specify a LABEL for each parameter. If no LABEL is "
"provided, PARAM will used as the LABEL. {}"
"To see all possible parameters that may be used with the "
"given input file(s), as well as all avaiable functions, "
"run --file-help, along with one or more input files. "
"If '*' is provided in addition to other parameters names, "
"then parameters will be loaded in a greedy fashion; i.e., "
"all other parameters that exist in the file(s) that are not "
"explicitly mentioned will also be loaded. For example, "
"if the input-file(s) contains 'srcmass1', "
"'srcmass2', and 'distance', and "
"\"'primary_mass(srcmass1, srcmass2):mass1' '*'\", is given "
"then 'mass1' and 'distance' will be loaded. Otherwise, "
"without the '*', only 'mass1' would be loaded. "
"Note that any parameter that is used in a function "
"will not automatically be added. Tip: enclose "
"arguments in single quotes, or else special characters will "
"be interpreted as shell commands. For example, the "
"wildcard should be given as either '*' or \\*, otherwise "
"bash will expand the * into the names of all the files in "
"the current directory."
.format(lblhelp))
results_reading_group.add_argument(
"--constraint", type=str, nargs="+", metavar="CONSTRAINT[:FILE]",
help="Apply a constraint to the samples. If a file is provided "
"after the constraint, it will only be applied to the given "
"file. Otherwise, the constraint will be applied to all "
"files. Only one constraint may be applied to a file. "
"Samples that violate the constraint will be removed. Syntax "
"is python; any parameter or function of parameter can be "
"used, similar to the parameters argument. Multiple "
"constraints may be combined by using '&' and '|'.")
return results_reading_group
[docs]
def results_from_cli(opts, load_samples=True, **kwargs):
"""Loads an inference result file along with any labels associated with it
from the command line options.
Parameters
----------
opts : ArgumentParser options
The options from the command line.
load_samples : bool, optional
Load the samples from the file.
Returns
-------
fp_all : (list of) BaseInferenceFile type
The result file as an hdf file. If more than one input file,
then it returns a list.
parameters : list of str
List of the parameters to use, parsed from the parameters option.
labels : dict
Dictionary of labels to associate with the parameters.
samples_all : (list of) FieldArray(s) or None
If load_samples, the samples as a FieldArray; otherwise, None.
If more than one input file, then it returns a list.
\**kwargs :
Any other keyword arguments that are passed to read samples using
samples_from_cli
"""
# lists for files and samples from all input files
fp_all = []
samples_all = []
input_files = opts.input_file
if isinstance(input_files, str):
input_files = [input_files]
# load constraints
constraints = {}
if opts.constraint is not None:
for constraint in opts.constraint:
if len(constraint.split(':')) == 2:
constraint, fn = constraint.split(':')
constraints[fn] = constraint
# no file provided, make sure there's only one constraint
elif len(opts.constraint) > 1:
raise ValueError("must provide a file to apply constraints "
"to if providing more than one constraint")
else:
# this means no file, only one constraint, apply to all
# files
constraints = {fn: constraint for fn in input_files}
# loop over all input files
for input_file in input_files:
logging.info("Reading input file %s", input_file)
# read input file
fp = loadfile(input_file, "r")
# load the samples
if load_samples:
logging.info("Loading samples")
# read samples from file
samples = fp.samples_from_cli(opts, parameters=opts.parameters,
**kwargs)
logging.info("Loaded {} samples".format(samples.size))
if input_file in constraints:
logging.info("Applying constraints")
mask = samples[constraints[input_file]]
samples = samples[mask]
if samples.size == 0:
raise ValueError("No samples remain after constraint {} "
"applied".format(constraints[input_file]))
logging.info("{} samples remain".format(samples.size))
# else do not read samples
else:
samples = None
# add results to lists from all input files
if len(input_files) > 1:
fp_all.append(fp)
samples_all.append(samples)
# else only one input file then do not return lists
else:
fp_all = fp
samples_all = samples
return fp_all, opts.parameters, opts.parameters_labels, samples_all
[docs]
def injections_from_cli(opts):
"""Gets injection parameters from the inference file(s).
If the opts have a ``injection_samples_map`` option, the injection
parameters will be remapped accordingly. See
:py:func:`pycbc.inference.option_utils.add_injsamples_map_opt` for
details.
Parameters
----------
opts : argparser
Argparser object that has the command-line objects to parse.
Returns
-------
FieldArray
Array of the injection parameters from all of the input files given
by ``opts.input_file``.
"""
# see if a mapping was provided
if hasattr(opts, 'injection_samples_map') and opts.injection_samples_map:
param_map = [opt.split(':') for opt in opts.injection_samples_map]
else:
param_map = []
input_files = opts.input_file
if isinstance(input_files, str):
input_files = [input_files]
injections = None
# loop over all input files getting the injection files
for input_file in input_files:
fp = loadfile(input_file, 'r')
these_injs = fp.read_injections()
# apply mapping if it was provided
if param_map:
mapvals = {sp: these_injs[ip] for ip, sp in param_map}
# if any of the new parameters are the same as the old, just
# overwrite the values
common_params = set(these_injs.fieldnames) & set(mapvals.keys())
for param in common_params:
these_injs[param] = mapvals.pop(param)
# add the rest as new fields
ps = list(mapvals.keys())
these_injs = these_injs.add_fields([mapvals[p] for p in ps],
names=ps)
if injections is None:
injections = these_injs
else:
injections = injections.append(these_injs)
return injections