Source code for so3g.hk.scanner

import so3g
from spt3g import core
import numpy as np

from so3g import hk

[docs] class HKScanner: """Module that scans and reports on HK archive contents and compliance. Attributes: stats (dict): A nested dictionary of statistics that are updated as frames are processed by the module. Elements: - ``n_hk`` (int): The number of HK frames encountered. - ``n_other`` (int): The number of non-HK frames encountered. - ``n_session`` (int): The number of distinct HK sessions processed. - ``concerns`` (dict): The number of warning (key ``n_warning``) and error (key ``n_error``) events encountered. The detail for such events is logged to ``spt3g.core.log_warning`` / ``log_error``. - ``versions`` (dict): The number of frames (value) (value) encountered that have a given hk_agg_version (key). """ def __init__(self): self.session_id = None self.providers = {} self.stats = { 'n_hk': 0, 'n_other': 0, 'n_session': 0, 'concerns': { 'n_error': 0, 'n_warning': 0 }, 'versions': {}, } def report_and_reset(self): core.log_info('Report for session_id %i:\n' % self.session_id + str(self.stats) + '\n' + str(self.providers) + '\nEnd report.', unit='HKScanner') self.session_id = None def __call__(self, f): """Processes a frame. Only Housekeeping frames will be examined; other frames will simply be counted. All frames are passed through unmodified. """ if f.type == core.G3FrameType.EndProcessing: self.report_and_reset() return [f] if f.type != core.G3FrameType.Housekeeping: self.stats['n_other'] += 1 return f self.stats['n_hk'] += 1 vers = f.get('hkagg_version', 0) self.stats['versions'][vers] = self.stats['versions'].get(vers, 0) + 1 if f['hkagg_type'] == so3g.HKFrameType.session: session_id = f['session_id'] if self.session_id is not None: if self.session_id != session_id: self.report_and_reset() # note this does clear self.session_id. if self.session_id is None: core.log_info('New HK Session id = %i, timestamp = %i' % (session_id, f['start_time']), unit='HKScanner') self.session_id = session_id self.stats['n_session'] += 1 elif f['hkagg_type'] == so3g.HKFrameType.status: # Have any providers disappeared? now_prov_id = [p['prov_id'].value for p in f['providers']] for p, info in self.providers.items(): if p not in now_prov_id: info['active'] = False # New providers? for p in now_prov_id: info = self.providers.get(p) if info is not None: if not info['active']: core.log_warn('prov_id %i came back to life.' % p, unit='HKScanner') self.stats['concerns']['n_warning'] += 1 info['n_active'] += 1 info['active'] = True else: self.providers[p] = { 'active': True, # Currently active (during processing). 'n_active': 1, # Number of times this provider id became active. 'n_frames': 0, # Number of data frames. 'timestamp_init': f['timestamp'], # Timestamp of provider appearance 'timestamp_data': None, # Timestamp of most recent data frame. 'ticks': 0, # Total number of timestamps in all blocks. 'span': None, # (earliest_time, latest_time) 'block_streams_map': {}, # Map from field name to block name. } elif f['hkagg_type'] == so3g.HKFrameType.data: info = self.providers[f['prov_id']] vers = f.get('hkagg_version', 0) info['n_frames'] += 1 t_this = f['timestamp'] if info['timestamp_data'] is None: t_ref = info['timestamp_init'] if t_this < t_ref: core.log_warn('data timestamp (%.1f) precedes provider ' 'timestamp by %f seconds.' % (t_this, t_this - t_ref), unit='HKScanner') self.stats['concerns']['n_warning'] += 1 elif t_this <= info['timestamp_data']: core.log_warn('data frame timestamps are not strictly ordered.', unit='HKScanner') self.stats['concerns']['n_warning'] += 1 info['timestamp_data'] = t_this # update t_check = [] blocks = f['blocks'] if vers == 0: block_timef = lambda block: block.t block_itemf = lambda block: [(k, block.data[k]) for k in block.data.keys()] elif vers >= 1: block_timef = lambda block: np.array([t.time / core.G3Units.seconds for t in b.times]) block_itemf = lambda block: [(k, block[k]) for k in block.keys()] if vers in [0]: block_name = lambda block_idx: list(sorted(blocks[block_idx].data.keys()))[0] if vers in [1]: block_name = lambda block_idx: list(sorted(blocks[block_idx].keys()))[0] elif vers >= 2: block_names = f.get('block_names', []) if len(block_names) != len(blocks): # This is a schema error in its own right. core.log_error('Frame does not have "block_names" entry, ' 'or it is not the same length as "blocks".', unit='HKScanner') self.stats['concerns']['n_error'] += 1 # Fall back on v1 strategy. block_name = lambda block_idx: list(sorted(blocks[block_idx].keys()))[0] else: block_name = lambda block_idx: f['block_names'][block_idx] for block_idx, b in enumerate(blocks): times = block_timef(b) if len(times): if info['span'] is None: info['span'] = times[0], times[-1] else: t0, t1 = info['span'] info['span'] = min(times[0], t0), max(times[-1], t1) t_check.append(times[0]) info['ticks'] += len(times) bname = block_name(block_idx) for k, v in block_itemf(b): if len(v) != len(times): core.log_error('Field "%s" has %i samples but .t has %i samples.' % (k, len(v), len(times))) self.stats['concerns']['n_error'] += 1 # Make sure field has a block_stream registered. if k not in info['block_streams_map']: info['block_streams_map'][k] = bname if info['block_streams_map'][k] != bname: core.log_error('Field "%s" appeared in block_name %s ' 'and later in block_name %s.' % (k, info['block_streams_map'][k], bname)) self.stats['concerns']['n_error'] += 1 if len(t_check) and abs(min(t_check) - t_this) > 60: core.log_warn('data frame timestamp (%.1f) does not correspond to ' 'data timestamp vectors (%s) .' % (t_this, t_check), unit='HKScanner') self.stats['concerns']['n_warning'] += 1 else: core.log_warn('Weird hkagg_type: %i' % f['hkagg_type'], unit='HKScanner') self.stats['concerns']['n_warning'] += 1 return [f]
if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--translate', action='store_true') parser.add_argument('--target-version', type=int, default=2) parser.add_argument('files', nargs='+') args = parser.parse_args() # The report is displayed at level LOG_INFO. core.set_log_level(core.G3LogLevel.LOG_INFO) # Run me on a G3File containing a Housekeeping stream. for f in args.files: p = core.G3Pipeline() p.Add(core.G3Reader(f)) if args.translate: p.Add(hk.HKTranslator(target_version=args.target_version)) p.Add(HKScanner()) p.Run()