# Copyright (C) 2014 Andrew Miller, Alex Nitz, Tito Dal Canton, Christopher M. Biwer
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Generals
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This modules contains functions for reading in data from frame files or caches
"""
import logging
import warnings
import os.path
import glob
import time
import math
import re
from urllib.parse import urlparse
import numpy
import lalframe
import lal
from gwdatafind import find_urls as find_frame_urls
import pycbc
from pycbc.types import TimeSeries, zeros
logger = logging.getLogger('pycbc.frame.frame')
# map LAL series types to corresponding functions and Numpy types
_fr_type_map = {
lal.S_TYPE_CODE: [
lalframe.FrStreamReadREAL4TimeSeries, numpy.float32,
lal.CreateREAL4TimeSeries,
lalframe.FrStreamGetREAL4TimeSeriesMetadata,
lal.CreateREAL4Sequence,
lalframe.FrameAddREAL4TimeSeriesProcData
],
lal.D_TYPE_CODE: [
lalframe.FrStreamReadREAL8TimeSeries, numpy.float64,
lal.CreateREAL8TimeSeries,
lalframe.FrStreamGetREAL8TimeSeriesMetadata,
lal.CreateREAL8Sequence,
lalframe.FrameAddREAL8TimeSeriesProcData
],
lal.C_TYPE_CODE: [
lalframe.FrStreamReadCOMPLEX8TimeSeries, numpy.complex64,
lal.CreateCOMPLEX8TimeSeries,
lalframe.FrStreamGetCOMPLEX8TimeSeriesMetadata,
lal.CreateCOMPLEX8Sequence,
lalframe.FrameAddCOMPLEX8TimeSeriesProcData
],
lal.Z_TYPE_CODE: [
lalframe.FrStreamReadCOMPLEX16TimeSeries, numpy.complex128,
lal.CreateCOMPLEX16TimeSeries,
lalframe.FrStreamGetCOMPLEX16TimeSeriesMetadata,
lal.CreateCOMPLEX16Sequence,
lalframe.FrameAddCOMPLEX16TimeSeriesProcData
],
lal.U4_TYPE_CODE: [
lalframe.FrStreamReadUINT4TimeSeries, numpy.uint32,
lal.CreateUINT4TimeSeries,
lalframe.FrStreamGetUINT4TimeSeriesMetadata,
lal.CreateUINT4Sequence,
lalframe.FrameAddUINT4TimeSeriesProcData
],
lal.I4_TYPE_CODE: [
lalframe.FrStreamReadINT4TimeSeries, numpy.int32,
lal.CreateINT4TimeSeries,
lalframe.FrStreamGetINT4TimeSeriesMetadata,
lal.CreateINT4Sequence,
lalframe.FrameAddINT4TimeSeriesProcData
],
}
def _read_channel(channel, stream, start, duration):
""" Get channel using lalframe """
channel_type = lalframe.FrStreamGetTimeSeriesType(channel, stream)
read_func = _fr_type_map[channel_type][0]
d_type = _fr_type_map[channel_type][1]
data = read_func(stream, channel, start, duration, 0)
return TimeSeries(data.data.data, delta_t=data.deltaT, epoch=start,
dtype=d_type)
def _is_gwf(file_path):
"""Test if a file is a frame file by checking if its contents begins with
the magic string 'IGWD'."""
try:
with open(file_path, 'rb') as f:
if f.read(4) == b'IGWD':
return True
except IOError:
pass
return False
[docs]
def locations_to_cache(locations, latest=False):
""" Return a cumulative cache file build from the list of locations
Parameters
----------
locations : list
A list of strings containing files, globs, or cache files used to
build a combined lal cache file object.
latest : Optional, {False, Boolean}
Only return a cache with the most recent frame in the locations.
If false, all results are returned.
Returns
-------
cache : lal.Cache
A cumulative lal cache object containing the files derived from the
list of locations.
"""
cum_cache = lal.Cache()
for source in locations:
flist = glob.glob(source)
if latest:
def relaxed_getctime(fn):
# when building a cache from a directory of temporary
# low-latency frames, files might disappear between
# the glob() and getctime() calls
try:
return os.path.getctime(fn)
except OSError:
return 0
if not flist:
raise ValueError('no frame or cache files found in ' + source)
flist = [max(flist, key=relaxed_getctime)]
for file_path in flist:
dir_name, file_name = os.path.split(file_path)
_, file_extension = os.path.splitext(file_name)
if file_extension in [".lcf", ".cache"]:
cache = lal.CacheImport(file_path)
elif file_extension == ".gwf" or _is_gwf(file_path):
cache = lalframe.FrOpen(str(dir_name), str(file_name)).cache
else:
raise TypeError("Invalid location name")
cum_cache = lal.CacheMerge(cum_cache, cache)
return cum_cache
[docs]
def read_frame(location, channels, start_time=None,
end_time=None, duration=None, check_integrity=False,
sieve=None):
"""Read time series from frame data.
Using the `location`, which can either be a frame file ".gwf" or a
frame cache ".gwf", read in the data for the given channel(s) and output
as a TimeSeries or list of TimeSeries.
Parameters
----------
location : string
A source of gravitational wave frames. Either a frame filename
(can include pattern), a list of frame files, or frame cache file.
channels : string or list of strings
Either a string that contains the channel name or a list of channel
name strings.
start_time : {None, LIGOTimeGPS}, optional
The gps start time of the time series. Defaults to reading from the
beginning of the available frame(s).
end_time : {None, LIGOTimeGPS}, optional
The gps end time of the time series. Defaults to the end of the frame.
Note, this argument is incompatible with `duration`.
duration : {None, float}, optional
The amount of data to read in seconds. Note, this argument is
incompatible with `end`.
check_integrity : {True, bool}, optional
Test the frame files for internal integrity.
sieve : string, optional
Selects only frames where the frame URL matches the regular
expression sieve
Returns
-------
Frame Data: TimeSeries or list of TimeSeries
A TimeSeries or a list of TimeSeries, corresponding to the data from
the frame file/cache for a given channel or channels.
"""
if end_time and duration:
raise ValueError("end time and duration are mutually exclusive")
if type(location) is list:
locations = location
else:
locations = [location]
cum_cache = locations_to_cache(locations)
if sieve:
logger.info("Using frames that match regexp: %s", sieve)
lal.CacheSieve(cum_cache, 0, 0, None, None, sieve)
if start_time is not None and end_time is not None:
# Before sieving, check if this is sane. Otherwise it will fail later.
if (int(math.ceil(end_time)) - int(start_time)) <= 0:
raise ValueError("Negative or null duration")
lal.CacheSieve(cum_cache, int(start_time), int(math.ceil(end_time)),
None, None, None)
stream = lalframe.FrStreamCacheOpen(cum_cache)
stream.mode = lalframe.FR_STREAM_VERBOSE_MODE
if check_integrity:
stream.mode = (stream.mode | lalframe.FR_STREAM_CHECKSUM_MODE)
lalframe.FrStreamSetMode(stream, stream.mode)
# determine duration of data
if type(channels) is list:
first_channel = channels[0]
else:
first_channel = channels
data_length = lalframe.FrStreamGetVectorLength(first_channel, stream)
channel_type = lalframe.FrStreamGetTimeSeriesType(first_channel, stream)
create_series_func = _fr_type_map[channel_type][2]
get_series_metadata_func = _fr_type_map[channel_type][3]
series = create_series_func(first_channel, stream.epoch, 0, 0,
lal.ADCCountUnit, 0)
get_series_metadata_func(series, stream)
data_duration = (data_length + 0.5) * series.deltaT
if start_time is None:
start_time = stream.epoch*1
if end_time is None:
end_time = start_time + data_duration
if type(start_time) is not lal.LIGOTimeGPS:
start_time = lal.LIGOTimeGPS(start_time)
if type(end_time) is not lal.LIGOTimeGPS:
end_time = lal.LIGOTimeGPS(end_time)
if duration is None:
duration = float(end_time - start_time)
else:
duration = float(duration)
# lalframe behaves dangerously with invalid duration so catch it here
if duration <= 0:
raise ValueError("Negative or null duration")
#if duration > data_duration:
# raise ValueError("Requested duration longer than available data")
if type(channels) is list:
all_data = []
for channel in channels:
channel_data = _read_channel(channel, stream, start_time, duration)
lalframe.FrStreamSeek(stream, start_time)
all_data.append(channel_data)
return all_data
else:
return _read_channel(channels, stream, start_time, duration)
[docs]
def frame_paths(
frame_type, start_time, end_time, server=None, url_type='file', site=None
):
"""Return the paths to a span of frame files.
Parameters
----------
frame_type : string
The string representation of the frame type (ex. 'H1_ER_C00_L1').
start_time : int
The start time that we need the frames to span.
end_time : int
The end time that we need the frames to span.
server : {None, SERVER:PORT string}, optional
Optional string to specify the datafind server to use. By default an
attempt is made to use a local datafind server.
url_type : string
Returns only frame URLs with a particular scheme or head such
as "file" or "https". Default is "file", which queries locally
stored frames. Option can be disabled if set to None.
site : string, optional
One-letter string specifying which site you want data from (H, L, V,
etc). If not given, the site is assumed to be the first letter of
`frame_type`, which is usually (but not always) a safe assumption.
Returns
-------
paths : list of paths
The list of paths to the frame files.
Examples
--------
>>> paths = frame_paths('H1_LDAS_C02_L2', 968995968, 968995968+2048)
"""
if site is None:
# this case is tolerated for backward compatibility
site = frame_type[0]
warnings.warn(
f'Guessing site {site} from frame type {frame_type}',
DeprecationWarning
)
cache = find_frame_urls(site, frame_type, start_time, end_time,
urltype=url_type, host=server)
return [urlparse(entry).path for entry in cache]
def get_site_from_type_or_channel(frame_type, channels):
"""Determine the site for querying gwdatafind (H, L, V, etc) based on
substrings of the frame type and channel(s).
The type should begin with S: or SN:, in which case S is taken as the
site. Otherwise, the same is done with the channel (with the first
channel if more than one are given). If that also fails, the site is
taken to be the first letter of the frame type, which is usually
(but not always) a correct assumption.
Parameters
----------
frame_type : string
The frame type, ideally prefixed by the site indicator.
channels : string or list of strings
The channel name or names.
Returns
-------
site : string
The site letter.
frame_type : string
The frame type with the site prefix (if any) removed.
"""
site_re = '^([^:])[^:]?:'
m = re.match(site_re, frame_type)
if m:
return m.groups(1)[0], frame_type[m.end():]
chan = channels
if isinstance(chan, list):
chan = channels[0]
m = re.match(site_re, chan)
if m:
return m.groups(1)[0], frame_type
warnings.warn(
f'Guessing site {frame_type[0]} from frame type {frame_type}',
DeprecationWarning
)
return frame_type[0], frame_type
[docs]
def query_and_read_frame(frame_type, channels, start_time, end_time,
sieve=None, check_integrity=False):
"""Read time series from frame data.
Query for the location of physical frames matching the frame type. Return
a time series containing the channel between the given start and end times.
Parameters
----------
frame_type : string
The type of frame file that we are looking for. The string should begin
with S: or SN:, in which case S is taken as the site to query. If this
is not the case, the site will be guessed from the channel name or from
the type in a different way, which may not work.
channels : string or list of strings
Either a string that contains the channel name or a list of channel
name strings.
start_time : LIGOTimeGPS or int
The gps start time of the time series. Defaults to reading from the
beginning of the available frame(s).
end_time : LIGOTimeGPS or int
The gps end time of the time series. Defaults to the end of the frame.
sieve : string, optional
Selects only frames where the frame URL matches the regular
expression sieve
check_integrity : boolean
Do an expensive checksum of the file before returning.
Returns
-------
Frame Data: TimeSeries or list of TimeSeries
A TimeSeries or a list of TimeSeries, corresponding to the data from
the frame file/cache for a given channel or channels.
Examples
--------
>>> ts = query_and_read_frame('H1_LDAS_C02_L2', 'H1:LDAS-STRAIN',
>>> 968995968, 968995968+2048)
"""
site, frame_type = get_site_from_type_or_channel(frame_type, channels)
# Allows compatibility with our standard tools
# We may want to place this into a higher level frame getting tool
if frame_type in ['LOSC_STRAIN', 'GWOSC_STRAIN']:
from pycbc.frame.gwosc import read_strain_gwosc
if not isinstance(channels, list):
channels = [channels]
data = [read_strain_gwosc(c[:2], start_time, end_time)
for c in channels]
return data if len(data) > 1 else data[0]
if frame_type in ['LOSC', 'GWOSC']:
from pycbc.frame.gwosc import read_frame_gwosc
return read_frame_gwosc(channels, start_time, end_time)
logger.info('Querying datafind server')
paths = frame_paths(
frame_type,
int(start_time),
int(numpy.ceil(end_time)),
site=site
)
logger.info('Found frame file paths: %s', ' '.join(paths))
return read_frame(
paths,
channels,
start_time=start_time,
end_time=end_time,
sieve=sieve,
check_integrity=check_integrity
)
[docs]
def write_frame(location, channels, timeseries):
"""Write a list of time series to a single frame file.
Parameters
----------
location : string
A frame filename.
channels : string or list of strings
Either a string that contains the channel name or a list of channel
name strings.
timeseries: TimeSeries
A TimeSeries or list of TimeSeries, corresponding to the data to be
written to the frame file for a given channel.
"""
# check if a single channel or a list of channels
if type(channels) is list and type(timeseries) is list:
channels = channels
timeseries = timeseries
else:
channels = [channels]
timeseries = [timeseries]
# check that timeseries have the same start and end time
gps_start_times = {series.start_time for series in timeseries}
gps_end_times = {series.end_time for series in timeseries}
if len(gps_start_times) != 1 or len(gps_end_times) != 1:
raise ValueError("Start and end times of TimeSeries must be identical.")
# check that start, end time, and duration are integers
gps_start_time = gps_start_times.pop()
gps_end_time = gps_end_times.pop()
duration = int(gps_end_time - gps_start_time)
if gps_start_time % 1 or gps_end_time % 1:
raise ValueError("Start and end times of TimeSeries must be integer seconds.")
# create frame
frame = lalframe.FrameNew(epoch=gps_start_time, duration=duration,
project='', run=1, frnum=1,
detectorFlags=lal.LALDETECTORTYPE_ABSENT)
for i,tseries in enumerate(timeseries):
# get data type
for seriestype in _fr_type_map.keys():
if _fr_type_map[seriestype][1] == tseries.dtype:
create_series_func = _fr_type_map[seriestype][2]
create_sequence_func = _fr_type_map[seriestype][4]
add_series_func = _fr_type_map[seriestype][5]
break
# add time series to frame
series = create_series_func(channels[i], tseries.start_time,
0, tseries.delta_t, lal.ADCCountUnit,
len(tseries.numpy()))
series.data = create_sequence_func(len(tseries.numpy()))
series.data.data = tseries.numpy()
add_series_func(frame, series)
# write frame
lalframe.FrameWrite(frame, location)
[docs]
class DataBuffer(object):
"""A linear buffer that acts as a FILO for reading in frame data
"""
def __init__(self, frame_src,
channel_name,
start_time,
max_buffer=2048,
force_update_cache=True,
increment_update_cache=None,
dtype=numpy.float64):
""" Create a rolling buffer of frame data
Parameters
---------
frame_src: str of list of strings
Strings that indicate where to read from files from. This can be a
list of frame files, a glob, etc.
channel_name: str
Name of the channel to read from the frame files
start_time:
Time to start reading from.
max_buffer: {int, 2048}, Optional
Length of the buffer in seconds
dtype: {dtype, numpy.float32}, Optional
Data type to use for the interal buffer
"""
self.frame_src = frame_src
self.channel_name = channel_name
self.read_pos = start_time
self.force_update_cache = force_update_cache
self.increment_update_cache = increment_update_cache
self.detector = channel_name.split(':')[0]
self.update_cache()
self.channel_type, self.raw_sample_rate = self._retrieve_metadata(self.stream, self.channel_name)
raw_size = self.raw_sample_rate * max_buffer
self.raw_buffer = TimeSeries(zeros(raw_size, dtype=dtype),
copy=False,
epoch=start_time - max_buffer,
delta_t=1.0/self.raw_sample_rate)
[docs]
def update_cache(self):
"""Reset the lal cache. This can be used to update the cache if the
result may change due to more files being added to the filesystem,
for example.
"""
cache = locations_to_cache(self.frame_src, latest=True)
stream = lalframe.FrStreamCacheOpen(cache)
self.stream = stream
@staticmethod
def _retrieve_metadata(stream, channel_name):
"""Retrieve basic metadata by reading the first file in the cache
Parameters
----------
stream: lal stream object
Stream containing a channel we want to learn about
channel_name: str
The name of the channel we want to know the dtype and sample rate of
Returns
-------
channel_type: lal type enum
Enum value which indicates the dtype of the channel
sample_rate: int
The sample rate of the data within this channel
"""
lalframe.FrStreamGetVectorLength(channel_name, stream)
channel_type = lalframe.FrStreamGetTimeSeriesType(channel_name, stream)
create_series_func = _fr_type_map[channel_type][2]
get_series_metadata_func = _fr_type_map[channel_type][3]
series = create_series_func(channel_name, stream.epoch, 0, 0,
lal.ADCCountUnit, 0)
get_series_metadata_func(series, stream)
return channel_type, int(1.0/series.deltaT)
def _read_frame(self, blocksize):
"""Try to read the block of data blocksize seconds long
Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
Returns
-------
data: TimeSeries
TimeSeries containg 'blocksize' seconds of frame data
Raises
------
RuntimeError:
If data cannot be read for any reason
"""
try:
read_func = _fr_type_map[self.channel_type][0]
dtype = _fr_type_map[self.channel_type][1]
data = read_func(self.stream, self.channel_name,
self.read_pos, int(blocksize), 0)
return TimeSeries(data.data.data, delta_t=data.deltaT,
epoch=self.read_pos,
dtype=dtype)
except Exception:
raise RuntimeError('Cannot read {0} frame data'.format(self.channel_name))
[docs]
def null_advance(self, blocksize):
"""Advance and insert zeros
Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
"""
self.raw_buffer.roll(-int(blocksize * self.raw_sample_rate))
self.read_pos += blocksize
self.raw_buffer.start_time += blocksize
[docs]
def advance(self, blocksize):
"""Add blocksize seconds more to the buffer, push blocksize seconds
from the beginning.
Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
"""
ts = self._read_frame(blocksize)
self.raw_buffer.roll(-len(ts))
self.raw_buffer[-len(ts):] = ts[:]
self.read_pos += blocksize
self.raw_buffer.start_time += blocksize
return ts
[docs]
def update_cache_by_increment(self, blocksize):
"""Update the internal cache by starting from the first frame
and incrementing.
Guess the next frame file name by incrementing from the first found
one. This allows a pattern to be used for the GPS folder of the file,
which is indicated by `GPSX` where x is the number of digits to use.
Parameters
----------
blocksize: int
Number of seconds to increment the next frame file.
"""
start = float(self.raw_buffer.end_time)
end = float(start + blocksize)
if not hasattr(self, 'dur'):
fname = glob.glob(self.frame_src[0])[0]
fname = os.path.splitext(os.path.basename(fname))[0].split('-')
self.beg = '-'.join([fname[0], fname[1]])
self.ref = int(fname[2])
self.dur = int(fname[3])
fstart = int(self.ref + numpy.floor((start - self.ref) / float(self.dur)) * self.dur)
starts = numpy.arange(fstart, end, self.dur).astype(int)
keys = []
for s in starts:
pattern = self.increment_update_cache
if 'GPS' in pattern:
n = int(pattern[int(pattern.index('GPS') + 3)])
pattern = pattern.replace('GPS%s' % n, str(s)[0:n])
name = f'{pattern}/{self.beg}-{s}-{self.dur}.gwf'
# check that file actually exists, else abort now
if not os.path.exists(name):
raise RuntimeError
keys.append(name)
cache = locations_to_cache(keys)
stream = lalframe.FrStreamCacheOpen(cache)
self.stream = stream
self.channel_type, self.raw_sample_rate = \
self._retrieve_metadata(self.stream, self.channel_name)
[docs]
def attempt_advance(self, blocksize, timeout=10):
""" Attempt to advance the frame buffer. Retry upon failure, except
if the frame file is beyond the timeout limit.
Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
timeout: {int, 10}, Optional
Number of seconds before giving up on reading a frame
Returns
-------
data: TimeSeries
TimeSeries containg 'blocksize' seconds of frame data
"""
if self.force_update_cache:
self.update_cache()
while True:
try:
if self.increment_update_cache:
self.update_cache_by_increment(blocksize)
return DataBuffer.advance(self, blocksize)
except RuntimeError:
if pycbc.gps_now() > timeout + self.raw_buffer.end_time:
# The frame is not there and it should be by now,
# so we give up and treat it as zeros
DataBuffer.null_advance(self, blocksize)
return None
# I am too early to give up on this frame,
# so we should try again
time.sleep(0.1)
[docs]
class StatusBuffer(DataBuffer):
""" Read state vector or DQ information from a frame file """
def __init__(self, frame_src,
channel_name,
start_time,
max_buffer=2048,
valid_mask=3,
force_update_cache=False,
increment_update_cache=None,
valid_on_zero=False):
""" Create a rolling buffer of status data from a frame
Parameters
---------
frame_src: str of list of strings
Strings that indicate where to read from files from. This can be a
list of frame files, a glob, etc.
channel_name: str
Name of the channel to read from the frame files
start_time:
Time to start reading from.
max_buffer: {int, 2048}, Optional
Length of the buffer in seconds
valid_mask: {int, HOFT_OK | SCIENCE_INTENT}, Optional
Set of flags that must be on to indicate valid frame data.
valid_on_zero: bool
If True, `valid_mask` is ignored and the status is considered
"good" simply when the channel is zero.
"""
DataBuffer.__init__(self, frame_src, channel_name, start_time,
max_buffer=max_buffer,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache,
dtype=numpy.int32)
self.valid_mask = valid_mask
self.valid_on_zero = valid_on_zero
[docs]
def check_valid(self, values, flag=None):
"""Check if the data contains any non-valid status information
Parameters
----------
values: pycbc.types.Array
Array of status information
flag: str, optional
Override the default valid mask with a user defined mask.
Returns
-------
status: boolean
Returns True if all of the status information if valid,
False if any is not.
"""
if self.valid_on_zero:
valid = values.numpy() == 0
else:
if flag is None:
flag = self.valid_mask
valid = numpy.bitwise_and(values.numpy(), flag) == flag
return bool(numpy.all(valid))
[docs]
def is_extent_valid(self, start_time, duration, flag=None):
"""Check if the duration contains any non-valid frames
Parameters
----------
start_time: int
Beginning of the duration to check in gps seconds
duration: int
Number of seconds after the start_time to check
flag: str, optional
Override the default valid mask with a user defined mask.
Returns
-------
status: boolean
Returns True if all of the status information if valid,
False if any is not.
"""
sr = self.raw_buffer.sample_rate
s = int((start_time - self.raw_buffer.start_time) * sr)
e = s + int(duration * sr) + 1
data = self.raw_buffer[s:e]
return self.check_valid(data, flag=flag)
[docs]
def indices_of_flag(self, start_time, duration, times, padding=0):
""" Return the indices of the times lying in the flagged region
Parameters
----------
start_time: int
Beginning time to request for
duration: int
Number of seconds to check.
padding: float
Number of seconds to add around flag inactive times to be considered
inactive as well.
Returns
-------
indices: numpy.ndarray
Array of indices marking the location of triggers within valid
time.
"""
from pycbc.events.veto import indices_outside_times
sr = self.raw_buffer.sample_rate
s = int((start_time - self.raw_buffer.start_time - padding) * sr) - 1
e = s + int((duration + padding) * sr) + 1
data = self.raw_buffer[s:e]
stamps = data.sample_times.numpy()
if self.valid_on_zero:
invalid = data.numpy() != 0
else:
invalid = numpy.bitwise_and(data.numpy(), self.valid_mask) \
!= self.valid_mask
starts = stamps[invalid] - padding
ends = starts + 1.0 / sr + padding * 2.0
idx = indices_outside_times(times, starts, ends)
return idx
[docs]
def advance(self, blocksize):
""" Add blocksize seconds more to the buffer, push blocksize seconds
from the beginning.
Parameters
----------
blocksize: int
The number of seconds to attempt to read from the channel
Returns
-------
status: boolean
Returns True if all of the status information if valid,
False if any is not.
"""
try:
if self.increment_update_cache:
self.update_cache_by_increment(blocksize)
ts = DataBuffer.advance(self, blocksize)
return self.check_valid(ts)
except RuntimeError:
self.null_advance(blocksize)
return False
[docs]
class iDQBuffer(object):
""" Read iDQ timeseries from a frame file """
def __init__(self, frame_src,
idq_channel_name,
idq_status_channel_name,
idq_threshold,
start_time,
max_buffer=512,
force_update_cache=False,
increment_update_cache=None):
"""
Parameters
----------
frame_src: str of list of strings
Strings that indicate where to read from files from. This can be a
list of frame files, a glob, etc.
idq_channel_name: str
Name of the channel to read the iDQ statistic from
idq_status_channel_name: str
Name of the channel to read the iDQ status from
idq_threshold: float
Threshold which triggers a veto if iDQ channel falls below this threshold
start_time:
Time to start reading from.
max_buffer: {int, 512}, Optional
Length of the buffer in seconds
force_update_cache: {boolean, True}, Optional
Re-check the filesystem for frame files on every attempt to
read more data.
increment_update_cache: {str, None}, Optional
Pattern to look for frame files in a GPS dependent directory. This
is an alternate to the forced updated of the frame cache, and
apptempts to predict the next frame file name without probing the
filesystem.
"""
self.threshold = idq_threshold
self.idq = DataBuffer(frame_src, idq_channel_name, start_time,
max_buffer=max_buffer,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache)
self.idq_state = DataBuffer(frame_src, idq_status_channel_name, start_time,
max_buffer=max_buffer,
force_update_cache=force_update_cache,
increment_update_cache=increment_update_cache)
[docs]
def flag_at_times(self, start_time, duration, times, padding=0):
""" Check whether the idq flag was on at given times
Parameters
----------
start_time: int
Beginning time to request for
duration: int
Number of seconds to check.
times: array of floats
Times to check for an active flag
padding: float
Amount of time in seconds to flag around samples
below the iDQ FAP threshold
Returns
-------
flag_state: numpy.ndarray
Boolean array of whether flag was on at given times
"""
from pycbc.events.veto import indices_within_times
# convert start and end times to buffer indices
sr = self.idq.raw_buffer.sample_rate
s = int((start_time - self.idq.raw_buffer.start_time - padding) * sr) - 1
e = s + int((duration + padding) * sr) + 1
# find samples when iDQ FAP is below threshold and state is valid
idq_fap = self.idq.raw_buffer[s:e]
low_fap = idq_fap.numpy() <= self.threshold
idq_valid = self.idq_state.raw_buffer[s:e]
idq_valid = idq_valid.numpy().astype(bool)
valid_low_fap = numpy.logical_and(idq_valid, low_fap)
# find times corresponding to the valid low FAP samples
glitch_idx = numpy.flatnonzero(valid_low_fap)
stamps = idq_fap.sample_times.numpy()
glitch_times = stamps[glitch_idx]
# construct start and end times of flag segments
starts = glitch_times - padding
ends = starts + 1.0 / sr + padding * 2.0
# check if times were flagged
idx = indices_within_times(times, starts, ends)
flagged_bool = numpy.zeros(len(times), dtype=bool)
flagged_bool[idx] = True
return flagged_bool
[docs]
def advance(self, blocksize):
""" Add blocksize seconds more to the buffer, push blocksize seconds
from the beginning.
Parameters
----------
blocksize: int
The number of seconds to attempt to read
Returns
-------
status: boolean
Returns True if advance is succesful,
False if not.
"""
idq_ts = self.idq.attempt_advance(blocksize)
idq_state_ts = self.idq_state.attempt_advance(blocksize)
return (idq_ts is not None) and (idq_state_ts is not None)
[docs]
def null_advance(self, blocksize):
"""Advance and insert zeros
Parameters
----------
blocksize: int
The number of seconds to advance the buffers
"""
self.idq.null_advance(blocksize)
self.idq_state.null_advance(blocksize)
__all__ = [
'locations_to_cache',
'read_frame',
'query_and_read_frame',
'frame_paths',
'write_frame',
'DataBuffer',
'StatusBuffer',
'iDQBuffer'
]