Source code for so3g.hk.getdata

"""Interface for querying and loading contiguous vectors from
Housekeeping frame streams.  We want to expose a sisock-compatible
API, where time ranges are an essential part of every query.

Use HKArchiveScanner to scan a set of G3 files containing Housekeeping
frames.  Use the resulting HKArchive to perform random I/O over a
sisock-style API.

"""
import itertools
import os
import pytz
import yaml
import logging
import pickle
import glob

import numpy as np
import datetime as dt


import so3g
from spt3g import core


hk_logger = logging.getLogger(__name__)
hk_logger.setLevel(logging.INFO)

SPAN_BUFFER_SECONDS = 1.0

def is_sub_seq(full_seq, sub_seq):
    """Return true if sub_seq is a sub-sequence of full_seq.

    """
    if len(sub_seq) == 0:
        return True
    for idx0,x0 in enumerate(full_seq):
        if x0 == sub_seq[0]:
            if is_sub_seq(full_seq[idx0+1:], sub_seq[1:]):
                return True
    return False


_SCHEMA_V1_BLOCK_TYPES = [
    core.G3VectorDouble,
    core.G3VectorInt,
    core.G3VectorString,
    core.G3VectorBool,
]


def _concat_hk_stream(blocks_in):
    """Concatenates an ordered list of compatible HK blocks into a single
    frame.  Each block should be a valid G3TimesampleMap with the same
    keys.

    Returns a single G3TimesampleMap with all fields concatenated.

    """
    blk = core.G3TimesampleMap()
    blk.times = core.G3VectorTime(blocks_in[0].times)
    fields = list(blocks_in[0].keys())
    for f in fields:
        f_short = f.split('.')[-1]
        blk[f] = blocks_in[0][f_short]
    for b in blocks_in[1:]:
        blk.times.extend(b.times)
    for f in fields:
        f_short = f.split('.')[-1]
        for _type in _SCHEMA_V1_BLOCK_TYPES:
            if isinstance(blocks_in[0][f_short], _type):
                break
        else:
            raise RuntimeError('Field "%s" is of unsupported type %s.' %
                               (f_short, type(blocks_in[0][f_short])))
        for b in blocks_in[1:]:
            blk[f].extend(b[f_short])
    return blk


[docs] class HKArchive: """Container for information necessary to determine what data fields are present in a data archive at what times. This object has methods that can determine what fields have data over a given time range, and can group fields that share a timeline (i.e. are co-sampled) over that range. """ field_groups = None def __init__(self, field_groups=None): if field_groups is not None: self.field_groups = list(field_groups) # A translator is used to update frames, on the fly, to the # modern schema assumed here. self.translator = so3g.hk.HKTranslator() def _get_groups(self, fields=None, start=None, end=None, short_match=False): """Helper for get_fields and get_data. Determines which fields, of those listed in fields, are present in the archive between the specified times. Args: fields (list of strings): List of field names. If None, all fields are considered. start (timestamp): Earliest time to consider. If None, consider arbitrarily early times. end (timestamp): Latest time to consider. If None, consider arbitrarily late times. Note that ``start`` and ``end`` form a semi-closed interval that includes start but excludes end. short_match (bool): If True, then a requested field will be considered to match an archive field if its "."-tokenized form is a sub-sequence of the "."-tokenized field in the archive. For example, the archive field "obs.agentX.feeds.therms.c1" may be matched by the requested field "agentX.c1". In the case that multiple archive fields match a requested field, a ValueError is thrown. Returns: List of (group_name, group_fields, fgs). The ``group_name`` is internally generated... probably something like "group0". ``group_fields`` is a list of fields belonging to this group. ``fgs`` is a list of _FieldGroup objects relevant to the fields in this group. Notes: Each entry in the returned list carries information for set of fields that are co-sampled over the requested time interval. Each of the requested fields will thus appear in at most one group. If a requested field is not found, it will not be listed in any group. """ span = so3g.IntervalsDouble() if start is None: start = span.domain[0] if end is None: end = span.domain[1] span.add_interval(start, end) if short_match and (fields is not None): field_seqs = [f.split('.') for f in fields] short_match_map = {} # map from shortened name to full field name. field_map = {} for fg in self.field_groups: both = span * fg.cover if len(both.array()) > 0: for f in fg.fields: full_field = fg.prefix + '.' + f key_field = full_field if fields is not None: # User is interested only in particular fields. if short_match: for seq in field_seqs: if is_sub_seq(full_field.split('.'), seq): key_field = '.'.join(seq) prev_short_match = short_match_map.get(key_field) if prev_short_match not in [None, full_field]: raise ValueError("Multiple matches for soft key: %s [%s, %s]." % (key_field, prev_short_match, full_field)) short_match_map[key_field] = full_field break else: continue elif full_field not in fields: continue if not key_field in field_map: field_map[key_field] = [fg] else: field_map[key_field].append(fg) # Sort each list of field_groups by object id -- all we care # about is whether two fields have the same field group set. [f.sort(key=lambda obj: id(obj)) for f in field_map.values()] # Now group together fields if they have identical # field_group lists (because when they do, they can share a # timeline). keys = list(field_map.keys()) grouped = [] i0, i1 = 0, 1 while i0 < len(keys): while i1 < len(keys) and field_map[keys[i0]] == field_map[keys[i1]]: i1 += 1 group_keys = keys[i0:i1] group_name = 'group%i' % len(grouped) grouped.append((group_name, group_keys, field_map[group_keys[0]])) i0, i1 = i1, i1+1 return grouped
[docs] def get_fields(self, start=None, end=None): """Get list of fields that might have a sample in the time interval [start,end). Returns the pair of dictionaries ``(fields, timelines)``. The ``fields`` dictionary is a map from field name to a block of field information. The ``timelines`` dictionary is a map from timeline name to a block of timeline information. """ grouped = self._get_groups(None, start, end) # Construct the fields and timelines dicts. field_map, timeline_map = {}, {} for (group_name, fields, data) in grouped: timeline_map[group_name] = {'interval': None, 'field': fields} for f in fields: field_map[f] = { 'description': None, 'timeline': group_name, 'type': 'number', 'units': None, } return field_map, timeline_map
[docs] def get_data(self, field=None, start=None, end=None, min_stride=None, raw=False, short_match=False): """Load data from specified field(s) between specified times. Arguments ``field``, ``start``, ``end``, ``short_match`` are as described in _get_groups. Arguments: min_stride (float): Specifies the minimum sample spacing, in seconds. Ignored in this implementation. raw (bool): If true, return G3 blocks instead of numpy arrays. Returns: Pair of dictionaries, (data, timelines). The ``data`` dictionary is a simple map from field name to a numpy array of readings. The ``timelines`` dictionary is a map from field group name to a dictionary of timeline information, which has entries: - ``'t'``: numpy array of timestamps - ``'fields'``: list of fields belonging to this group. - ``'finalized_until'``: in cases where the data are still in flux, this field provides a timestamp that may be taken as the earliest time that needs to be requeried. This is part of the interface in order to support data streams that are being updated in real time. If user requested raw=True, then return value is a list of tuples of the form (group_name, block) where block is a single G3TimesampleMap carrying all the data for that co-sampled group. """ grouped = self._get_groups(field, start, end, short_match=short_match) hk_logger.debug('get_data: _get_groups returns %i groups.' % len(grouped)) # Pass through the metadata. Collect information on what # field groups are present in what frames of what files; sort # that info by file and offset so we make a single monotonic # pass through the frames. group_info = { # group_name: {'types': [dtype, ...], # 'fields': [(full_name, short_name), ...], # 'count': n}, # ... } files = { # filename: { # offset: [(block_index, group_name, output_offset), ...]], # ... # }, # ... } def check_overlap(time_range): # Note the time_range is inclusive on both ends. return ((start is None or start <= time_range[1]) and (end is None or end > time_range[0])) for group_name, fields, fgrps in grouped: # This is a group of co-sampled fields. The fields share # a sample count and a frame-index map. all_frame_refs = [] for fg in fgrps: all_frame_refs.extend( [(b['time_range'], b['count'], b['filename'], b['byte_offset'], b['block_index']) for b in fg.index_info]) all_frame_refs.sort() vector_offset = 0 for time_range, n, filename, byte_offset, block_index in all_frame_refs: if not check_overlap(time_range): continue if not filename in files: files[filename] = {} if byte_offset not in files[filename]: files[filename][byte_offset] = [] files[filename][byte_offset].append((block_index, group_name, vector_offset)) vector_offset += n group_info[group_name] = { 'count': vector_offset, 'fields': [(f, f.split('.')[-1]) for f in fields], 'types': [], } # Pass through the data. Should read the files in order, # jumping monotonically through the needed frames. The data # type of output arrays is determined from whatever # np.array(G3Object) returns on the first block. Note strings # ('U') have to be handled differently because we can't know # the maximum string length from the first block. data = {} timelines = {} for filename, file_map in sorted(files.items()): hk_logger.debug('get_data: reading %s' % filename) reader = so3g.G3IndexedReader(filename) for byte_offset, frame_info in sorted(file_map.items()): # Seek and decode. hk_logger.debug('get_data: seeking to %i for %i block extractions' % (byte_offset, len(frame_info))) reader.Seek(byte_offset) frames = reader.Process(None) assert(len(frames) == 1) frames = self.translator(frames[0]) frame = frames[0] # Now expand all blocks. for block_index, group_name, offset in frame_info: block = frame['blocks'][block_index] gi = group_info[group_name] if raw: # Short-circuit if in raw mode, just collect all blocks. if group_name not in data: data[group_name] = {} for field, f_short in gi['fields']: data[group_name] = [] data[group_name].append(block) continue if group_name not in timelines: # This block is init that happens only once per group. timelines[group_name] = { 't_g3': np.zeros(gi['count'], dtype=np.int64), 'fields': [f for f,s in gi['fields']], } hk_logger.debug('get_data: creating group "%s" with %i fields' % (group_name, len(gi['fields']))) # Determine data type of each field and create output arrays. for field, f_short in gi['fields']: dtype = np.array(block[f_short]).dtype gi['types'].append(dtype) if dtype.char == 'U': data[field] = [] else: data[field] = np.empty(gi['count'], dtype) hk_logger.debug('get_data: field "%s" has type %s' % ( f_short, dtype)) # Copy in block data. n = len(block.times) # Note this is in G3 time units for now... fixed later. timelines[group_name]['t_g3'][offset:offset+n] = block.times for (field, f_short), dtype in zip(gi['fields'], gi['types']): if dtype.char == 'U': data[field].append((offset, list(map(str, block[f_short])))) else: # This is a relatively quick copy because # of buffer pass-through from G3... don't # hit the RHS with np.array! data[field][offset:offset+n] = block[f_short] if raw: return [(group_name, _concat_hk_stream(data[group_name])) for group_name, _, _ in grouped] # Finalize string fields. for group_name, fields, fgrps in grouped: gi = group_info[group_name] for (field, f_short), dtype in zip(gi['fields'], gi['types']): if dtype.char == 'U': data[field] = np.array(list(itertools.chain( *[x for i, x in sorted(data[field])]))) assert(len(data[field]) == gi['count']) # Scale out time units. for timeline in timelines.values(): timeline['t'] = timeline.pop('t_g3') / core.G3Units.seconds # Restrict to only the requested time range. if start is not None or end is not None: for timeline in timelines.values(): i0, i1 = 0, len(timeline['t']) if start is not None: i0 = np.searchsorted(timeline['t'], start) if end is not None: i1 = np.searchsorted(timeline['t'], end) sl = slice(i0, i1) timeline['t'] = timeline['t'][sl] for k in timeline['fields']: data[k] = data[k][sl] # Mark last time for timeline in timelines.values(): if len(timeline['t']): timeline['finalized_until'] = timeline['t'][-1] else: timeline['finalized_until'] = start if start is not None else 0. return (data, timelines)
[docs] def simple(self, fields=None, start=None, end=None, min_stride=None, raw=False, short_match=True): """Load data from specified field(s) between specified times, and unpack the data for ease of use. Use get_data if you want to preserve the co-sampling structure. Arguments ``field``, ``start``, ``end``, ``short_match`` are as described in _get_groups. However, ``fields`` can be a single string rather than a list of strings. Note that ``short_match`` defaults to True (which is not the case for getdata).x Returns: List of pairs of numpy arrays (t,y) corresponding to each field in the ``fields`` list. If ``fields`` is a string, a simple pair (t,y) is returned. ``t`` and ``y`` are numpy arrays of equal length containing the timestamps and field readings, respectively. In cases where two fields are co-sampled, the time vector will be the same object. In cases where there are no data for the requested field in the time range, a pair of length 0 float arrays is returned. """ unpack = isinstance(fields, str) if unpack: fields = [fields] data, timelines = self.get_data(fields, start, end, min_stride, raw, short_match) output = {} fields_not_found = [f for f in fields] for timeline in timelines.values(): # Make the array here, so that the same object is returned # for all fields in this group. _t = np.array(timeline['t']) for f in timeline['fields']: output[f] = (_t, np.array(data[f])) fields_not_found.remove(f) nothing = np.zeros((0,)) for f in fields_not_found: output[f] = (nothing, nothing) output = [output[f] for f in fields] if unpack: output = output[0] return output
class _HKProvider: def __init__(self, prov_id, prefix): self.prov_id = prov_id self.prefix = prefix self.blocks = {} @classmethod def from_g3(cls, element): prov_id = element['prov_id'].value prefix = element['description'].value return cls(prov_id, prefix)
[docs] class HKArchiveScanner: """Consumes SO Housekeeping streams and creates a record of what fields cover what time ranges. This can run as a G3Pipeline module, but will not be able to record stream indexing information in that case. If it's populated through the process_file method, then index information (in the sense of filenames and byte offsets) will be stored. After processing frames, calling .finalize() will return an HKArchive that can be used to load data more efficiently. """ def __init__(self, pre_proc_dir=None, pre_proc_mode=None): self.session_id = None self.providers = {} self.field_groups = [] self.frame_info = [] self.counter = -1 self.translator = so3g.hk.HKTranslator() self.pre_proc_dir = pre_proc_dir self.pre_proc_mode = pre_proc_mode def __call__(self, *args, **kw): return self.Process(*args, **kw)
[docs] def Process(self, f, index_info=None): """Processes a frame. Only Housekeeping frames will be examined; other frames will simply be counted. All frames are passed through unmodified. The index_info will be stored along with a description of the frame's data; see the .process_file function. """ self.counter += 1 if index_info is None: index_info = {'counter': self.counter} f = self.translator(f) assert(len(f) == 1) f = f[0] if f.type == core.G3FrameType.EndProcessing: return [f] if f.type != core.G3FrameType.Housekeeping: return [f] vers = f.get('hkagg_version', 0) assert(vers == 2) if f['hkagg_type'] == so3g.HKFrameType.session: session_id = f['session_id'] if self.session_id is not None: if self.session_id != session_id: self.flush() # Note that this sets self.session_id=None. if self.session_id is None: core.log_info('New HK Session id = %i, timestamp = %i' % (session_id, f['start_time']), unit='HKScanner') self.session_id = session_id elif f['hkagg_type'] == so3g.HKFrameType.status: # If a provider has disappeared, flush its information into a # FieldGroup. prov_cands = [_HKProvider.from_g3(p) for p in f['providers']] to_flush = list(self.providers.keys()) # prov_ids... for p in prov_cands: if p.prov_id in to_flush: to_flush.remove(p.prov_id) # no, don't. else: self.providers[p.prov_id] = p for prov_id in to_flush: self.flush([prov_id]) elif f['hkagg_type'] == so3g.HKFrameType.data: # Data frame -- merge info for this provider. prov = self.providers[f['prov_id']] representatives = prov.blocks.keys() for bidx, (bname, b) in enumerate(zip(f['block_names'], f['blocks'])): assert(isinstance(b, core.G3TimesampleMap)) if bname not in prov.blocks: prov.blocks[bname] = {'fields': list(b.keys()), 'start': b.times[0].time / core.G3Units.seconds, 'index_info': []} # To ensure that the last sample is actually included # in the semi-open intervals we use to track frames, # the "end" time has to be after the final sample. prov.blocks[bname]['end'] = b.times[-1].time / core.G3Units.seconds + SPAN_BUFFER_SECONDS ii = {'block_index': bidx, 'time_range': (b.times[0].time / core.G3Units.seconds, b.times[-1].time / core.G3Units.seconds), 'count': len(b.times)} ii.update(index_info) prov.blocks[bname]['index_info'].append(ii) else: core.log_warn('Weird hkagg_type: %i' % f['hkagg_type'], unit='HKScanner') return [f]
[docs] def flush(self, provs=None): """Convert any cached provider information into _FieldGroup information. Delete the provider information. This will be called automatically during frame processing if a provider session ends. Once frame processing has completed, this function should be called, with no arguments, to flush any remaining provider sessions. Args: provs (list or None): list of provider identifiers to flush. If None, flush all and also reset the self.session_id. Returns: None """ if provs is None: provs = list(self.providers.keys()) self.session_id = None for p in provs: prov = self.providers.pop(p) blocks = prov.blocks for k, info in blocks.items(): fg = _FieldGroup(prov.prefix, info['fields'], info['start'], info['end'], info['index_info']) self.field_groups.append(fg)
[docs] def finalize(self): """Finalize the processing by calling flush(), then return an HKArchive with all the _FieldGroup information from this scanning session. """ self.flush() return HKArchive(self.field_groups)
[docs] def process_file(self, filename, flush_after=True): """Process the file specified by ``filename`` using a G3IndexedReader. Each frame from the file is passed to self.Process, with the optional index_info argument set to a dictionary containing the filename and byte_offset of the frame. Internal data grouping will be somewhat cleaner if the multiple files from a single aggregator "session" are passed to this function in acquisition order. In that case, call with flush_after=False. """ reader = so3g.G3IndexedReader(filename) while True: info = {'filename': filename, 'byte_offset': reader.Tell()} frames = reader.Process(None) assert len(frames) <= 1 if len(frames) == 0: break self.Process(frames[0], info) # Calling flush() here protects us against the odd case that # we process files from a single session in non-consecutive # order. In that case the start' and 'end' times will get # messed up because we can't tell the stream has been # re-initialized. if flush_after: self.flush()
[docs] def process_file_with_cache(self, filename): """Processes file specified by ``filename`` using the process_file method above. If self.pre_proc_dir is specified (not None), it will load pickled HKArchiveScanner objects and concatenates with self instead of re-processing each frame, if the corresponding file exists. If the pkl file does not exist, it processes it and saves the result (in the pre_proc_dir) so it can be used in the future. If self.pre_proc_dir is not specified, this becomes equivalent to process_file. """ if self.pre_proc_dir is None: self.process_file(filename) return folder = os.path.basename(filename)[:5] path = os.path.join( self.pre_proc_dir, folder, os.path.basename(filename).replace(".g3",'.pkl') ) if os.path.exists(path): with open(path, 'rb') as pkfl: hksc = pickle.load(pkfl) else: hksc = HKArchiveScanner() hksc.process_file(filename) # Make dirs if needed if not os.path.exists( os.path.dirname(path) ): os.makedirs( os.path.dirname(path) ) if self.pre_proc_mode is not None: os.chmod( os.path.dirname(path), self.pre_proc_mode ) # Save pkl file with open(path, 'wb') as pkfl: pickle.dump(hksc, pkfl) if self.pre_proc_mode is not None: os.chmod( path, self.pre_proc_mode ) self.field_groups += hksc.field_groups self.counter += hksc.counter
class _FieldGroup: """Container object for look-up information associated with a group of fields that share a timeline (i.e. a group of fields that are co-sampled over some requested time range). Attributes: prefix (str): Not used. fields (list of str): The field names. cover (IntervalsDouble): The time range (as unix timestamps) over which these fields have data. This is expressed as an IntervalsDouble to enable the use of Intervals arithmetic. The range is created from the "start" and "end" arguments of the constructor. index_info (list of dict): Information that the consumer will use to locate and load the data efficiently. The entries in the list represent time-ordered frames. See Notes. Notes: Each entry of index_info is a dict providing information about a single frame and block where data can be found. The fields are: - 'filename' (str): The file in which the frame is located. - 'byte_offset' (int): The offset within the file at which to find the frame. - 'block_index' (int): The index of the block, within the corresponding frame, where the data are found. - 'count' (int): the number of samples in this block. - 'time_range' (tuple): (first time, last time), as unix timestamps. Note the last time is the time of the last sample, not some time beyond that. - 'counter' (int): An index providing the order in which the frames were scanned. """ def __init__(self, prefix, fields, start, end, index_info): self.prefix = prefix self.fields = list(fields) self.cover = so3g.IntervalsDouble().add_interval(start, end) self.index_info = index_info def __repr__(self): try: return '_FieldGroup("%s", %i fields, %i segments)' % ( self.prefix, len(self.fields), len(self.index_info)) except: return '_FieldGroup(<bad internal state!>)'
[docs] def to_timestamp(some_time, str_format=None): """Convert the argument to a unix timestamp. Args: some_time: If a datetime, it is converted to UTC timezone and then to a unix timestamp. If int or float, the value is returned unprocessed. If str, a date will be extracted based on a few trial date format strings. str_format: a format string (for strptime) to try, instead of the default(s). Returns: float: Unix timestamp corresponding to some_time. """ if type(some_time) == dt.datetime: return some_time.astimezone(dt.timezone.utc).timestamp() if type(some_time) == int or type(some_time) == float: return some_time if type(some_time) == str: if str_format is not None: return to_timestamp(pytz.utc.localize( dt.datetime.strptime(some_time, str_format ))) str_options = ['%Y-%m-%d', '%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M:%S.%f'] for option in str_options: try: return to_timestamp(pytz.utc.localize( dt.datetime.strptime(some_time, option ))) except: continue raise ValueError('Could not process string into date object, options are: {}'.format(str_options)) raise ValueError('Type of date / time indication is invalid, accepts datetime, int, float, and string')
[docs] def load_range(start, stop, fields=None, alias=None, data_dir=None, config=None, pre_proc_dir=None, pre_proc_mode=None, folder_patterns=None, strict=True): """Args: start: Earliest time to search for data (see note on time formats). stop: Latest time to search for data (see note on time formats). fields: Fields to return, if None, returns all fields. alias: If not None, must be a list of strings providing exactly one value for each entry in fields. data_dir: directory where all the ctime folders are. If None, tries to use $OCS_DATA_DIR. config: filename of a .yaml file for loading data_dir / fields / alias pre_proc_dir: Place to store pickled HKArchiveScanners for g3 files to speed up loading pre_proc_mode: Permissions (passed to os.chmod) to be used on dirs and pkl files in the pre_proc_dir. No chmod if None. folder_patterns: List of patterns to search for in folders. If None, default pattern is ['{folder}', 'hk_{folder}_*']. If not None, usage for .g3 folder: ['{folder}'], and example usage for HK books: ['hk_{folder}_lat'] where {folder} will be replaced with the first 5 digits of the unix timestamp when looping through files. strict: If False, log and skip missing fields rather than raising a KeyError. Returns: Dictionary with structure:: { alias[i] : (time[i], data[i]) } It will be masked to only have data between start and stop. Notes: The "start" and "stop" argument accept a variety of formats, including datetime objects, unix timestamps, and strings (see to_timestamp function). In the case of datetime objects, you should set tzinfo=dt.timezone.utc explicitly if the system is not set to UTC time. Example usage:: fields = [ 'observatory.HAN.feeds.temperatures.Channel 1 T', 'observatory.HAN.feeds.temperatures.Channel 2 T', ] alias = [ 'HAN 1', 'HAN 2', ] folder_patterns = ['hk_{folder}_satp3'] start = dt.datetime(2020,2,19,18,48) stop = dt.datetime(2020,2,22) data = load_range(start, stop, fields=fields, alias=alias, folder_patterns=folder_patterns) plt.figure() for name in alias: plt.plot( data[name][0], data[name][1]) """ if config is not None: if not (data_dir is None and fields is None and alias is None): hk_logger.warning('''load_range has a config file - data_dir, fields, and alias are ignored''') with open(config, 'r') as f: setup = yaml.load(f, Loader=yaml.FullLoader) if 'data_dir' not in setup.keys(): raise ValueError('load_range config file requires data_dir entry') data_dir = setup['data_dir'] if 'field_list' not in setup.keys(): raise ValueError('load_range config file requires field_list entry') fields = [] alias = [] for k in setup['field_list']: fields.append( setup['field_list'][k]) alias.append( k ) if data_dir is None and 'OCS_DATA_DIR' not in os.environ.keys(): raise ValueError('if $OCS_DATA_DIR is not defined a data directory must be passed to getdata') if data_dir is None: data_dir = os.environ['OCS_DATA_DIR'] hk_logger.debug('Loading data from {}'.format(data_dir)) start_ctime = to_timestamp(start) - 3600 stop_ctime = to_timestamp(stop) + 3600 hksc = HKArchiveScanner(pre_proc_dir=pre_proc_dir) if folder_patterns is None: folder_patterns = ['{folder}', 'hk_{folder}_*'] for folder in range( int(start_ctime/1e5), int(stop_ctime/1e5)+1): bases = [] for pattern in folder_patterns: extended_pattern = pattern.format(folder=folder) base = glob.glob(os.path.join(data_dir, extended_pattern)) bases.extend(base) if len(bases) > 1: bases.sort base = bases[0] hk_logger.warn(f"Multiple base folders were found for {folder}. The first one, alphabetically, is selected: {base}") elif len(bases) == 1: base = bases[0] elif len(bases) == 0: hk_logger.debug(f"No base folder found for {folder}, skipping") continue for file in sorted(os.listdir(base)): if file.endswith('.yaml'): continue try: t = int(file[:-3]) except: hk_logger.debug('{} does not have the right format, skipping'.format(file)) continue if t >= start_ctime-3600 and t <=stop_ctime+3600: hk_logger.debug('Processing {}'.format(base+'/'+file)) hksc.process_file_with_cache( base+'/'+file) cat = hksc.finalize() start_ctime = to_timestamp(start) stop_ctime = to_timestamp(stop) all_fields,_ = cat.get_fields() if fields is None: fields = all_fields if alias is not None: if len(alias) != len(fields): hk_logger.error('if provided, alias needs to be the length of fields') else: alias = fields # Single pass load. keepers = [] for name, field in zip(alias, fields): if field not in all_fields: hk_logger.debug('`{}` not in available fields, skipping'.format(field)) continue keepers.append((name, field)) data = cat.simple([f for n, f in keepers], start=start_ctime, end=stop_ctime) data = {name: data[i] for i, (name, field) in enumerate(keepers)} return data
if __name__ == '__main__': import argparse parser = argparse.ArgumentParser( usage='This demo can be used to plot data from SO HK files.') parser.add_argument('--field') parser.add_argument('files', nargs='+', help= "SO Housekeeping files to load.") args = parser.parse_args() # Scan the file set. hkas = HKArchiveScanner() for filename in args.files: print(f'Scanning {filename}...') hkas.process_file(filename) print(' ... finalizing.') cat = hkas.finalize() # Get list of fields, timelines, spanning all times: fields, timelines = cat.get_fields() # Show for field_name in sorted(fields): group_name = fields[field_name]['timeline'] print(field_name, timelines[group_name]['interval']) if args.field is None: full_name = list(fields.keys())[0] print('Pretending interest in:', full_name) else: full_name = args.field print('User requests:', full_name) # Identify the shortest form of this field that also works. f_toks = full_name.split('.') field_name = full_name for i in range(1, 2**len(f_toks)): short_name = '.'.join([t for b,t in enumerate(f_toks) if (i >> b) & 1]) try: fields, timelines = cat.get_data([short_name], short_match=True) except Exception: continue if len(short_name) < len(field_name): field_name = short_name print(field_name) print('Name shortened to:', field_name) # This is the awkward way, which preserves co-sampled # relationships (and is thus annoying to decode in simple cases). fields, timelines = cat.get_data([field_name], short_match=True) x0 = list(timelines.values())[0]['t'] y0 = fields[field_name] # This is the easy way, which just gives you one timeline per # requested field. x1, y1 = cat.simple(field_name) assert np.all(np.array(x0) == x1) and np.all(np.array(y0) == y1) import pylab as pl pl.plot(x1, y1) pl.title(field_name) pl.show()