Source code for so3g.hk.translator

"""Backwards compatibility for older SO HK schemas.

"""

import so3g
import so3g.hk
from spt3g import core


[docs] class HKTranslator: """Translates SO Housekeeping frames from schema versions {v0, v1} to schema version 2. Passes v2 (or newer) frames through, unmodified. This can be used in a G3Pipeline to condition archival HK streams for processing by v2-compatible code. (Note that code that works with the short-lived v1 schema should also work on a v2 stream, unless it explicitly rejects based on hkagg_version.) Version 1/2 are not a strict superset of version 0, but the main structural features eliminated in v1 (field prefixes) was not really used. """ def __init__(self, target_version=2, future_tolerant=True): """Arguments: target_version (int): 0, 1, or 2. Version to which to translate the stream. The code is not able to downgrade a stream. See future_tolerant parameter. future_tolerant (bool): Determines the behavior of the translator should it encounter a frame with hkagg_version higher than target_version. If future_tolerant is True, the frame will be passed through unmodified. Otherwise, a ValueError will be raised. """ self.stats = {'n_hk': 0, 'n_other': 0, 'versions': {}} self.target_version = target_version self.future_tolerant = future_tolerant
[docs] def Process(self, f): """Translates one frame to the target schema. Irrelevant frames are passed through unmodified. Args: f: a G3Frame Returns: A list containing only the translated frame. G3Pipeline compatibility would permit us to return a single frame here, instead of a length-1 list. But we also sometimes call Process outside of a G3Pipeline, where a consistent output type is desirable. Returning lists is most future-compatible; consumers that want to assume length-1 should assert it to be true. """ if f.type == core.G3FrameType.EndProcessing: core.log_info(str(self.stats)) return [f] if f.type != core.G3FrameType.Housekeeping: self.stats['n_other'] += 1 return [f] # It is an HK frame. orig_version = f.get('hkagg_version', 0) self.stats['n_hk'] += 1 self.stats['versions'][orig_version] = self.stats['versions'].get(orig_version, 0) + 1 if orig_version > self.target_version and not self.future_tolerant: raise ValueError( ('Translator to v%i encountered v%i, but future_tolerant=False.') % (self.TARGET_VERSION, orig_version)) if orig_version >= self.target_version: return [f] # Always update the version, even if that's our only change... if 'hkagg_version' in f: if 'hkagg_version_orig' not in f: f['hkagg_version_orig'] = orig_version del f['hkagg_version'] f['hkagg_version'] = self.target_version # No difference in Session/Status for v0, v1, v2. if f.get('hkagg_type') != so3g.HKFrameType.data: return [f] if self.target_version == 0: return [f] if orig_version == 0: # Pop the data blocks out of the frame. orig_blocks = f.pop('blocks') f['blocks'] = core.G3VectorFrameObject() # Now process the data blocks. for block in orig_blocks: new_block = core.G3TimesampleMap() new_block.times = so3g.hk.util.get_g3_time(block.t) for k in block.data.keys(): v = block.data[k] new_block[k] = core.G3VectorDouble(v) f['blocks'].append(new_block) if self.target_version == 1: return [f] if orig_version <= 1: # Add 'block_names'. Since we don't want to start # caching Block Stream information, just compute a good # block name based on the alphabetically first field in # the block. block_names = [] for block in f['blocks']: field_names = list(sorted(block.keys())) block_names.append('block_for_%s' % field_names[0]) assert(len(block_names[-1]) < 256) # What have you done. orig_block_names = [] f['block_names'] = core.G3VectorString(block_names) return [f]
def __call__(self, *args, **kwargs): return self.Process(*args, **kwargs)
if __name__ == '__main__': import argparse parser = argparse.ArgumentParser( usage='This program can be used to convert SO HK Frames to the ' 'latest schema version.') parser.add_argument('--output-file', '-o', default='out.g3') parser.add_argument('--target-version', type=int) parser.add_argument('files', nargs='+', help= "SO Housekeeping files to convert.") args = parser.parse_args() # Run me on a G3File containing a Housekeeping stream. core.set_log_level(core.G3LogLevel.LOG_INFO) translator_args = {} if args.target_version is not None: translator_args['target_version'] = args.target_version print(f'Streaming to {args.output_file}') p = core.G3Pipeline() p.Add(core.G3Reader(args.files)) p.Add(HKTranslator(**translator_args)) p.Add(core.G3Writer(args.output_file)) p.Run()