Source code for so3g.hk.tree

"""HKTree: representation of an HK data archive's structure as a tree
of attributes.

"""

from so3g.hk import getdata
import time
import os
import yaml
import logging


logger = logging.getLogger(__name__)


[docs] class HKRef(object): """Node in an HKTree attribute tree. Because public child attributes are generated dynamically, important functionality is hidden in "private members". The ``_load`` and ``_clear`` methods are documented below, but be aware also of the following attributes: - ``_data``: The loaded data, as a tuple of arrays (t, val). - ``_t``: alias for _data[0]. - ``_val``: alias for _data[1]. - ``_private``: A dict of information for managing the reference, including the full field name, the root tree object, the list of child refs. """ _data = None def __init__(self, name, parent, terminal): super().__init__() self._private = { 'name': name, 'parent': parent, 'children': [], 'terminal': terminal, 'alias': None, } @property def _t(self): if self._data: return self._data[0] @property def _val(self): if self.data: return self._data[1] def __getattr__(self, k): # For non-private, invalid attribute names ... return an # HKDeadend so user scripts don't immediately fail in cases # where a field is not present during this time interval. if k[0] == '_': raise AttributeError(k) return HKDeadend(self._private['name'] + '.' + k, self._private['parent'], False) def __repr__(self): name = self._private['name'] post = ' ...' if not self._private['terminal'] else '' data = ' [loaded]' if self._data is not None else '' return f'<{self.__class__.__name__}:{name}{post}{data}>'
[docs] def _load(self, **kw): """Load the data for this node and all child nodes. The data are saved, in each terminal node, in the ``_data`` attribute as a (times, values) tuple. Returns: All the data that was loaded, as a dictionary where the key is the full field name and the value is the (times, values) tuple. """ return self._private['parent']([self], **kw)
[docs] def _clear(self): """Discard any loaded data for this node and all child nodes.""" if self._private['terminal']: self._data = None else: [x._clear() for x in self._private['children']]
class HKDeadend(HKRef): pass class HKAliasRefs(HKRef): def _load(self, **kw): return self._private['parent']([self], use_aliases=True, **kw)
[docs] class HKTree: def __init__(self, start=None, stop=None, config=None, data_dir=None, pre_proc_dir=None, aliases=None, skip=['observatory', 'feeds']): """Scan an HK archive, between two times, and create an attribute tree representing the HK data. Args: start (time): Earliest time to include (defaults to 1 day ago). stop (time): Latest time to include (defaults to 1 day after start). config (str): Filename of a config file (yaml). Alternately a dict can be passed in directly. data_dir (str): The root directory for the HK files. pre_proc_dir (str): Directory to use to store/retrieve first-pass scanning data (see HKArchiveScanner). aliases (dict): Map from alias name to full field name. This setting does not override aliases from the config file but instead will extend them. skip (list of str): Tokens to suppress when turning feed names (e.g. "observatory.X.feeds.Y") into tree components (e.g. X.Y). Notes: Initialization of the tree requires a "first pass" scan of the HK data archive. The time range you specify is thus very important in limiting the amount of IO activity. The arguments passed are closely related to the load_ranges function in this module; see that docstring. Config files that work with load_ranges should also work here. """ # Parse time args now = time.time() if start is None: start = now - 86400 else: start = getdata.to_timestamp(start) if stop is None: stop = start + 86400 else: stop = getdata.to_timestamp(stop) if aliases is None: aliases = {} else: aliases = dict(aliases) # copy # Use config dict / file? if isinstance(config, str): config = yaml.safe_load(open(config, 'rb')) if config is not None: if data_dir is None: data_dir = config.get('data_dir') if pre_proc_dir is None: pre_proc_dir = config.get('pre_proc_dir') if config.get('field_list'): for k, v in config['field_list'].items(): if k not in aliases: aliases[k] = v if config.get('skip_tokens'): skip = skip + list(config['skip_tokens']) # Final default substitutions. if data_dir is None: data_dir = os.environ['OCS_DATA_DIR'] # Walk the files -- same approach as load_ranges logger.debug('Scanning %s (pre_proc=%s)' % (data_dir, pre_proc_dir)) hksc = getdata.HKArchiveScanner(pre_proc_dir=pre_proc_dir) for folder in range(int(start / 1e5), int(stop / 1e5) + 1): base = os.path.join(data_dir, str(folder)) logger.debug(f' ... checking {base}') if not os.path.exists(base): continue for filename in sorted(os.listdir(base)): logger.debug(f' ... ... processing {filename}') try: t = int(filename[:-3]) except ValueError: logger.warning(' ... ... filename does not lead with ' f'timestamp, skipping: {filename}') continue if t >= start - 3600 and t <= stop + 3600: hksc.process_file_with_cache(os.path.join(base, filename)) self._private = { 'skip': skip, 'children': [], } self._private['arc'] = hksc.finalize() self._private['fields'] = self._private['arc'].get_fields() # Prepare alias reverse map... rev_aliases = {v: k for k, v in aliases.items()} self._aliases = HKAliasRefs('', self, False) # Build attribute tree. for k in self._private['fields'][0].keys(): target = self._find(k, create_missing=True) if k in rev_aliases: self._add_alias(rev_aliases[k], target) def _find(self, name, create_missing=False): """Find the HKRef in the attribute tree associated with a full field name. If create_missing is False, this will return None if the name is not found in the tree; if create_missing is True, any missing HKRef will be created and the terminal node returned. """ tokens = name.split('.') target = self name = '' for i, t in enumerate(tokens): end_point = (i == len(tokens) - 1) name += t if t not in self._private['skip']: t = t.replace('-', '_') if not hasattr(target, t) or \ isinstance(getattr(target, t), HKDeadend): if not create_missing: return None new_node = HKRef(name, self, terminal=end_point) target._private['children'].append(new_node) setattr(target, t, new_node) target = getattr(target, t) name += '.' return target
[docs] def _add_alias(self, alias, full_ref, _strip_prefix=None): """Add a field to the set of aliases. Args: alias (str): The alias key. full_ref (str or HKRef): The full name of the field or an HKRef for the field. Notes: If the full_ref is a non-terminal HKRef, then all children will be added in. The alias for each will be constructed by combining the provided alias string and the attribute names of each child node, joined with '_'. """ if isinstance(full_ref, str): target = self._find(full_ref) if target is None: raise ValueError(f'No node found for "{full_ref}"') else: target = full_ref # Handle non-terminal nodes name = target._private['name'] if not target._private['terminal']: if _strip_prefix is None: _strip_prefix = name for t in target._private['children']: self._add_alias(alias, t, _strip_prefix=_strip_prefix) return if target._private['alias'] is not None: logger.warning('Field {k} is already aliased as {k}, ' 'ignoring new alias.') return if _strip_prefix: assert(name.startswith(_strip_prefix)) alias = alias + '_' + name[len(_strip_prefix)+1:] target._private['alias'] = alias clean_alias = alias.replace('.', '_').replace('-', '_') setattr(self._aliases, clean_alias, target) self._aliases._private['children'].append(target)
def __getattr__(self, k): if k[0] == '_': raise AttributeError(k) return HKDeadend(k, self, False)
[docs] def _load(self, **kw): """Call _load() on all child attributes and return a giant data dictionary (see HKRef._load). """ return self(self._private['children'], **kw)
[docs] def _clear(self): """Drop all loaded data.""" for c in self._private['children']: c._clear()
def __call__(self, fields, use_aliases=False, **kw): def get_targets(fs): if isinstance(fs, HKRef): if fs._private['terminal']: return [fs] return get_targets(fs._private['children']) fields = [] for f in fs: fields.extend(get_targets(f)) return fields field_targets = get_targets(fields) # Tuples (field_name, key_name) names = [f._private['name'] for f in field_targets] if use_aliases: names = [(n, n if f._private['alias'] is None else f._private['alias']) for n, f in zip(names, field_targets)] else: names = [(n, n) for n in names] data = self._private['arc'].simple([n[0] for n in names]) # Populate the endpoints. for t, vects in zip(field_targets, data): t._data = vects return {k[1]: d for k, d in zip(names, data)}