# Copyright (C) 2015 Ian Harry, Tito Dal Canton
# 2022 Shichao Wu
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Generals
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This modules contains extensions for use with argparse
"""
import copy
import argparse
from collections import defaultdict
[docs]class DictWithDefaultReturn(defaultdict):
default_set = False
ifo_set = False
def __bool__(self):
if self.items() and not all(entry is None for entry in self.values()):
# True if any values are explictly set.
return True
elif self['RANDOM_STRING_314324'] is not None:
# Or true if the default value was set
# NOTE: This stores the string RANDOM_STRING_314324 in the dict
# so subsequent calls will be caught in the first test here.
return True
else:
# Else false
return False
# Python 2 and 3 have different conventions for boolean method
__nonzero__ = __bool__
[docs]class MultiDetOptionAction(argparse.Action):
# Initialise the same as the standard 'append' action
def __init__(self,
option_strings,
dest,
nargs='+',
const=None,
default=None,
type=None,
choices=None,
required=False,
help=None,
metavar=None):
if type is not None:
self.internal_type = type
else:
self.internal_type = str
new_default = DictWithDefaultReturn(lambda: default)
#new_default.default_value=default
if nargs == 0:
raise ValueError('nargs for append actions must be > 0; if arg '
'strings are not supplying the value to append, '
'the append const action may be more appropriate')
if const is not None and nargs != argparse.OPTIONAL:
raise ValueError('nargs must be %r to supply const'
% argparse.OPTIONAL)
super(MultiDetOptionAction, self).__init__(
option_strings=option_strings,
dest=dest,
nargs=nargs,
const=const,
default=new_default,
type=str,
choices=choices,
required=required,
help=help,
metavar=metavar)
def __call__(self, parser, namespace, values, option_string=None):
# Again this is modified from the standard argparse 'append' action
err_msg = "Issue with option: %s \n" %(self.dest,)
err_msg += "Received value: %s \n" %(' '.join(values),)
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, DictWithDefaultReturn())
items = getattr(namespace, self.dest)
items = copy.copy(items)
for value in values:
value = value.split(':')
if len(value) == 2:
# "Normal" case, all ifos supplied independently as "H1:VALUE"
if items.default_set:
err_msg += "If you are supplying a value for all ifos, you "
err_msg += "cannot also supply values for specific ifos."
raise ValueError(err_msg)
items[value[0]] = self.internal_type(value[1])
items.ifo_set = True
elif len(value) == 1:
# OR supply only one value and use this for all ifos
if items.default_set:
err_msg += "If you are supplying a value for all ifos, you "
err_msg += "must only supply one value."
raise ValueError(err_msg)
# Can't use a global and ifo specific options
if items.ifo_set:
err_msg += "If you are supplying a value for all ifos, you "
err_msg += "cannot also supply values for specific ifos."
raise ValueError(err_msg)
#items.default_value = self.internal_type(value[0])
new_default = self.internal_type(value[0])
items.default_factory = lambda: new_default
items.default_set = True
else:
err_msg += "The character ':' is used to deliminate the "
err_msg += "ifo and the value. Please do not use it more than "
err_msg += "once."
raise ValueError(err_msg)
setattr(namespace, self.dest, items)
[docs]class MultiDetOptionActionSpecial(MultiDetOptionAction):
"""
This class in an extension of the MultiDetOptionAction class to handle
cases where the : is already a special character. For example the channel
name is something like H1:CHANNEL_NAME. Here the channel name *must*
be provided uniquely for each ifo. The dictionary key is set to H1 and the
value to H1:CHANNEL_NAME for this example.
"""
def __call__(self, parser, namespace, values, option_string=None):
# Again this is modified from the standard argparse 'append' action
err_msg = "Issue with option: %s \n" %(self.dest,)
err_msg += "Received value: %s \n" %(' '.join(values),)
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, {})
items = getattr(namespace, self.dest)
items = copy.copy(items)
for value in values:
value_split = value.split(':')
if len(value_split) == 2:
# "Normal" case, all ifos supplied independently as "H1:VALUE"
if value_split[0] in items:
err_msg += "Multiple values supplied for ifo %s.\n" \
%(value_split[0],)
err_msg += "Already have %s." %(items[value_split[0]])
raise ValueError(err_msg)
else:
items[value_split[0]] = value
elif len(value_split) == 3:
# This is an unadvertised feature. It is used for cases where I
# want to pretend H1 data is actually L1 (or similar). So if I
# supply --channel-name H1:L1:LDAS-STRAIN I can use L1 data and
# pretend it is H1 internally.
if value_split[0] in items:
err_msg += "Multiple values supplied for ifo %s.\n" \
%(value_split[0],)
err_msg += "Already have %s." %(items[value_split[0]])
raise ValueError(err_msg)
else:
items[value_split[0]] = ':'.join(value_split[1:3])
else:
err_msg += "The character ':' is used to deliminate the "
err_msg += "ifo and the value. It must appear exactly "
err_msg += "once."
raise ValueError(err_msg)
setattr(namespace, self.dest, items)
[docs]class MultiDetMultiColonOptionAction(MultiDetOptionAction):
"""A special case of `MultiDetOptionAction` which allows one to use
arguments containing colons, such as `V1:FOOBAR:1`. The first colon is
assumed to be the separator between the detector and the argument.
All subsequent colons are kept as part of the argument. Unlike
`MultiDetOptionAction`, all arguments must be prefixed by the
corresponding detector.
"""
def __call__(self, parser, namespace, values, option_string=None):
err_msg = ('Issue with option: {}\n'
'Received value: {}\n').format(self.dest, ' '.join(values))
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, {})
items = copy.copy(getattr(namespace, self.dest))
for value in values:
if ':' not in value:
err_msg += ("Each argument must contain at least one ':' "
"character")
raise ValueError(err_msg)
detector, argument = value.split(':', 1)
if detector in items:
err_msg += ('Multiple values supplied for detector {},\n'
'already have {}.')
err_msg = err_msg.format(detector, items[detector])
raise ValueError(err_msg)
items[detector] = self.internal_type(argument)
setattr(namespace, self.dest, items)
[docs]class MultiDetOptionAppendAction(MultiDetOptionAction):
def __call__(self, parser, namespace, values, option_string=None):
# Again this is modified from the standard argparse 'append' action
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, {})
items = getattr(namespace, self.dest)
items = copy.copy(items)
for value in values:
value = value.split(':')
if len(value) == 2:
# "Normal" case, all ifos supplied independetly as "H1:VALUE"
if value[0] in items:
items[value[0]].append(self.internal_type(value[1]))
else:
items[value[0]] = [self.internal_type(value[1])]
else:
err_msg = "Issue with option: %s \n" %(self.dest,)
err_msg += "Received value: %s \n" %(' '.join(values),)
err_msg += "The character ':' is used to distinguish the "
err_msg += "ifo and the value. It must be given exactly once "
err_msg += "for all entries"
raise ValueError(err_msg)
setattr(namespace, self.dest, items)
[docs]class DictOptionAction(argparse.Action):
# Initialise the same as the standard 'append' action
def __init__(self,
option_strings,
dest,
nargs='+',
const=None,
default=None,
type=None,
choices=None,
required=False,
help=None,
metavar=None):
if type is not None:
self.internal_type = type
else:
self.internal_type = str
new_default = DictWithDefaultReturn(lambda: default)
if nargs == 0:
raise ValueError('nargs for append actions must be > 0; if arg '
'strings are not supplying the value to append, '
'the append const action may be more appropriate')
if const is not None and nargs != argparse.OPTIONAL:
raise ValueError('nargs must be %r to supply const'
% argparse.OPTIONAL)
super(DictOptionAction, self).__init__(
option_strings=option_strings,
dest=dest,
nargs=nargs,
const=const,
default=new_default,
type=str,
choices=choices,
required=required,
help=help,
metavar=metavar)
def __call__(self, parser, namespace, values, option_string=None):
# Again this is modified from the standard argparse 'append' action
err_msg = "Issue with option: %s \n" %(self.dest,)
err_msg += "Received value: %s \n" %(' '.join(values),)
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, {})
items = getattr(namespace, self.dest)
items = copy.copy(items)
for value in values:
if values == ['{}']:
break
value = value.split(':')
if len(value) == 2:
# "Normal" case, all extra arguments supplied independently
# as "param:VALUE"
items[value[0]] = self.internal_type(value[1])
else:
err_msg += "The character ':' is used to distinguish the "
err_msg += "parameter name and the value. Please do not "
err_msg += "use it more than or less than once."
raise ValueError(err_msg)
setattr(namespace, self.dest, items)
[docs]class MultiDetDictOptionAction(DictOptionAction):
"""A special case of `DictOptionAction` which allows one to use
argument containing the detector (channel) name, such as
`DETECTOR:PARAM:VALUE`. The first colon is the name of detector,
the second colon is the name of parameter, the third colon is the value.
Or similar to `DictOptionAction`, all arguments don't contain the name of
detector, such as `PARAM:VALUE`, this will assume each detector has same
values of those parameters.
"""
def __call__(self, parser, namespace, values, option_string=None):
# Again this is modified from the standard argparse 'append' action
err_msg = ('Issue with option: {}\n'
'Received value: {}\n').format(self.dest, ' '.join(values))
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, {})
items = copy.copy(getattr(namespace, self.dest))
detector_args = {}
for value in values:
if values == ['{}']:
break
if value.count(':') == 2:
detector, param_value = value.split(':', 1)
param, val = param_value.split(':')
if detector not in detector_args:
detector_args[detector] = {param: self.internal_type(val)}
if param in detector_args[detector]:
err_msg += ("Multiple values supplied for the same "
"parameter {} under detector {},\n"
"already have {}.")
err_msg = err_msg.format(param, detector,
detector_args[detector][param])
else:
detector_args[detector][param] = self.internal_type(val)
elif value.count(':') == 1:
param, val = value.split(':')
for detector in getattr(namespace, 'instruments'):
if detector not in detector_args:
detector_args[detector] = \
{param: self.internal_type(val)}
if param in detector_args[detector]:
err_msg += ("Multiple values supplied for the same "
"parameter {} under detector {},\n"
"already have {}.")
err_msg = err_msg.format(
param, detector,
detector_args[detector][param])
else:
detector_args[detector][param] = \
self.internal_type(val)
else:
err_msg += ("Use format `DETECTOR:PARAM:VALUE` for each "
"detector, or use `PARAM:VALUE` for all.")
raise ValueError(err_msg)
items = detector_args
setattr(namespace, self.dest, items)
[docs]def required_opts(opt, parser, opt_list, required_by=None):
"""Check that all the opts are defined
Parameters
----------
opt : object
Result of option parsing
parser : object
OptionParser instance.
opt_list : list of strings
required_by : string, optional
the option that requires these options (if applicable)
"""
for name in opt_list:
attr = name[2:].replace('-', '_')
if not hasattr(opt, attr) or (getattr(opt, attr) is None):
err_str = "%s is missing " % name
if required_by is not None:
err_str += ", required by %s" % required_by
parser.error(err_str)
[docs]def required_opts_multi_ifo(opt, parser, ifo, opt_list, required_by=None):
"""Check that all the opts are defined
Parameters
----------
opt : object
Result of option parsing
parser : object
OptionParser instance.
ifo : string
opt_list : list of strings
required_by : string, optional
the option that requires these options (if applicable)
"""
for name in opt_list:
attr = name[2:].replace('-', '_')
try:
if getattr(opt, attr)[ifo] is None:
raise KeyError
except KeyError:
err_str = "%s is missing " % name
if required_by is not None:
err_str += ", required by %s" % required_by
parser.error(err_str)
[docs]def ensure_one_opt(opt, parser, opt_list):
""" Check that one and only one in the opt_list is defined in opt
Parameters
----------
opt : object
Result of option parsing
parser : object
OptionParser instance.
opt_list : list of strings
"""
the_one = None
for name in opt_list:
attr = name[2:].replace('-', '_')
if hasattr(opt, attr) and (getattr(opt, attr) is not None):
if the_one is None:
the_one = name
else:
parser.error("%s and %s are mutually exculsive" \
% (the_one, name))
if the_one is None:
parser.error("you must supply one of the following %s" \
% (', '.join(opt_list)))
[docs]def ensure_one_opt_multi_ifo(opt, parser, ifo, opt_list):
""" Check that one and only one in the opt_list is defined in opt
Parameters
----------
opt : object
Result of option parsing
parser : object
OptionParser instance.
opt_list : list of strings
"""
the_one = None
for name in opt_list:
attr = name[2:].replace('-', '_')
try:
if getattr(opt, attr)[ifo] is None:
raise KeyError
except KeyError:
pass
else:
if the_one is None:
the_one = name
else:
parser.error("%s and %s are mutually exculsive" \
% (the_one, name))
if the_one is None:
parser.error("you must supply one of the following %s" \
% (', '.join(opt_list)))
[docs]def copy_opts_for_single_ifo(opt, ifo):
"""
Takes the namespace object (opt) from the multi-detector interface and
returns a namespace object for a single ifo that can be used with
functions expecting output from the single-detector interface.
"""
opt = copy.deepcopy(opt)
for arg, val in vars(opt).items():
if isinstance(val, DictWithDefaultReturn) or \
(isinstance(val, dict) and ifo in val):
setattr(opt, arg, getattr(opt, arg)[ifo])
return opt
[docs]def convert_to_process_params_dict(opt):
"""
Takes the namespace object (opt) from the multi-detector interface and
returns a dictionary of command line options that will be handled correctly
by the register_to_process_params ligolw function.
"""
opt = copy.deepcopy(opt)
for arg, val in vars(opt).items():
if isinstance(val, DictWithDefaultReturn):
new_val = []
for key in val.keys():
if isinstance(val[key], list):
for item in val[key]:
if item is not None:
new_val.append(':'.join([key, str(item)]))
else:
if val[key] is not None:
new_val.append(':'.join([key, str(val[key])]))
setattr(opt, arg, new_val)
return vars(opt)
def _positive_type(s, dtype=None):
"""
Ensure argument is positive and convert type to dtype
This is for the functions below to wrap to avoid code duplication.
"""
assert dtype is not None
err_msg = f"Input must be a positive {dtype}, not {s}"
try:
value = dtype(s)
except ValueError:
raise argparse.ArgumentTypeError(err_msg)
if value <= 0:
raise argparse.ArgumentTypeError(err_msg)
return value
def _nonnegative_type(s, dtype=None):
"""
Ensure argument is positive or zero and convert type to dtype
This is for the functions below to wrap to avoid code duplication.
"""
assert dtype is not None
err_msg = f"Input must be either a positive or zero {dtype}, not {s}"
try:
value = dtype(s)
except ValueError:
raise argparse.ArgumentTypeError(err_msg)
if value < 0:
raise argparse.ArgumentTypeError(err_msg)
return value
[docs]def positive_float(s):
"""
Ensure argument is a positive real number and return it as float.
To be used as type in argparse arguments.
"""
return _positive_type(s, dtype=float)
[docs]def nonnegative_float(s):
"""
Ensure argument is a positive real number or zero and return it as float.
To be used as type in argparse arguments.
"""
return _nonnegative_type(s, dtype=float)
[docs]def positive_int(s):
"""
Ensure argument is a positive integer and return it as int.
To be used as type in argparse arguments.
"""
return _positive_type(s, dtype=int)
[docs]def nonnegative_int(s):
"""
Ensure argument is a positive integer or zero and return it as int.
To be used as type in argparse arguments.
"""
return _nonnegative_type(s, dtype=int)