Source code for so3g.hk.session

import so3g
from spt3g import core
import time
import os
import binascii


[docs] class HKSessionHelper: def __init__(self, session_id=None, start_time=None, hkagg_version=None, description='No description provided.'): """Helper class to produce G3Frame templates for creating streams of generic HK data. Arguments: session_id: an integer session ID for the HK session. If not provided (recommended) then it will be generated based on the PID, the start_time, and the description string. start_time (float): a timestamp to use for the HK session. hkagg_version (int): schema version code, which will be written into every frame. Defaults to zero, for backwards compatibility. Make sure this is set correctly. description (str): a description of the agent generating the stream. """ if start_time is None: start_time = time.time() self.start_time = start_time self.description = description self.provs = {} self.next_prov_id = 0 if session_id is None: session_id = self._generate_session_id(start_time, description) self.session_id = session_id if hkagg_version is None: hkagg_version = 0 self.hkagg_version = hkagg_version @staticmethod def _generate_session_id(timestamp=None, description=''): if timestamp is None: timestamp = time.time() if description is None: description = '?' # Bit-combine some unique stuff. It's useful if this sorts in # time, so lead off with the timestamp. elements = [(int(timestamp), 32), (os.getpid(), 14), (binascii.crc32(bytes(description, 'utf8')), 14)] session_id = 0 for i, b in elements: session_id = (session_id << b) | (i % (1 << b)) return session_id
[docs] def add_provider(self, description=None): """Register a provider and return the unique prov_id. (Remember to write a status frame before starting to write data for this provider.) Args: description (str): The name to use for the provider. When later retrieving the data, this will act as a prefix for all the data fields. Returns: prov_id (int). """ assert(description is not None) # Need a provider name! prov_id = self.next_prov_id self.next_prov_id += 1 self.provs[prov_id] = {'description': description} return prov_id
[docs] def remove_provider(self, prov_id): """Drops a provider from the active list, so it will not be listed in the status frame. Args: prov_id (int): The provider ID returned by add_provider. """ del self.provs[prov_id]
""" Frame generators. """
[docs] def session_frame(self): """ Return the Session frame. No additional information needs to be added to this frame. This frame initializes the HK stream (so it should be the first frame of each HK file; it should precede all other HK frames in a network source). """ f = core.G3Frame() f.type = core.G3FrameType.Housekeeping f['hkagg_type'] = so3g.HKFrameType.session f['hkagg_version'] = self.hkagg_version f['session_id'] = self.session_id f['start_time'] = self.start_time f['description'] = self.description return f
[docs] def status_frame(self, timestamp=None): """ Return a Status frame template. Before processing, the session manager should update with information about what providers are currently connected. """ if timestamp is None: timestamp = time.time() f = core.G3Frame() f.type = core.G3FrameType.Housekeeping f['hkagg_type'] = so3g.HKFrameType.status f['hkagg_version'] = self.hkagg_version f['session_id'] = self.session_id f['timestamp'] = timestamp provs = core.G3VectorFrameObject() for prov_id in sorted(self.provs.keys()): prov = core.G3MapFrameObject() prov['prov_id'] = core.G3Int(prov_id) prov['description'] = core.G3String( self.provs[prov_id]['description']) provs.append(prov) f['providers'] = provs return f
[docs] def data_frame(self, prov_id, timestamp=None): """ Return a Data frame template. The prov_id must match the prov_id in one of the Provider blocks in the preceding status frame. The session manager should create and add IrregBlockDouble items to the 'blocks' list. """ if timestamp is None: timestamp = time.time() f = core.G3Frame() f.type = core.G3FrameType.Housekeeping f['hkagg_version'] = self.hkagg_version f['hkagg_type'] = so3g.HKFrameType.data f['session_id'] = self.session_id f['prov_id'] = prov_id f['timestamp'] = timestamp f['blocks'] = core.G3VectorFrameObject() if self.hkagg_version >= 2: f['block_names'] = core.G3VectorString() return f