import so3g
from . import quat
import numpy as np
from .ranges import Ranges, RangesMatrix
from . import mapthreads
# For coordinate systems we use the following abbreviations:
#
# - DC: Detector coordinates
# - FP: Focal plane coordinates
# - CS: Celestial spherical coordinates (likely equatorial)
# - NS: Native spherical coordinates of the map projection
#
# Saying that a quaternion rotation q_B<A "takes A to B", where A and
# B are coordinate systems, means that if a vector has coordinates v_A
# in system A then its coordinates in system B are:
#
# v_B = q_B<A v_A q_B<A*
#
# The rotation taking detector coordinates to native spherical
# coordinates is the product
#
# q_NS<DC = q_NS<CS q_CS<FP q_FP<DC
#
# In this module, quaternion rotations are written as:
# - Q_CS<FP: Assembly.Q (vector in samples)
# - Q_FP<DC: Assembly.dets (vector in dets)
# - Q_NS<CS: Projectionst.q_celestial_to_native
#
# The Projectionist caches a vector of Q_NS<FP, which is then simply
# called "q_native". This is usually the thing to pass to C++ level.
#: Valid settings for "interpol". First entry is the default.
INTERPOLS = ['nearest', 'bilinear']
[docs]
class _ProjectionistBase:
"""This is a base class to assist with populating data
structures needed for accelerated pointing routines.
This class should not be used directly; call instead
:class:`Projectionist` for rectangular pixelizations
or :class:`ProjectionistHealpix` for HEALPix.
On instantiation, it carries information about the relation
between celestial spherical coordinates and the Native Spherical
coordinates of the projection, and also the pixelization scheme
for the projection.
The following symbols are used in docstrings to represent shapes:
* ``n_dets`` - Number of detectors in the focal plane.
* ``n_samps`` - Number of samples in the signal space of each detector.
* ``n_threads`` - Number of threads to use in parallelized computation.
* ``n_mapidx`` - Number of indices required to specify a pixel in
a map. For simple RectPix maps this is probably 2, with the first
index specifying row and the second index specifying column.
But for tiled maps n_mapidx is 3, with the first axis specifying
the tile number. For HEALPix, n_mapidx is 1 for untiled and 2
for tiled maps.
* ``n_comp`` - Number of map components, such as Stokes components
or other spin components. This takes the value 3, for example,
if projection into TQU space has been requested.
When coordinate computation or projection routines are called, a
ProjEng (a wrapped C++ object) is instantiated to perform the
accelerated computations. The spin-composition (i.e. is this a
'T' map or a 'TQU' map) must be specified at this time, to
instantiate the correct accelerated object.
Some method parameters are common to many functions and are documented
here for consistency:
* ``assembly`` - an Assembly, providing a focal plane (quaternion
offsets for each detector) as well as a boresight vector
(quaternion for each time sample).
* ``comps`` - a string specifying the Stokes components of the
map, for example 'T' or 'TQU'. When absent, this will be
guessed from the map shape; with 1|2|3 mapping to 'T'|'QU'|'TQU'
respectively.
* ``threads`` - the thread assignment, consisting of a list of
RangesMatrix objects. Each RangesMatrix object must have shape
(n_threads, n_dets, n_samps). The n_threads does not need to
be the same for every entry in the list. In TOD-to-map
operations, each entry of this list is processed fully before
proceeding to the next one. Each entry "ranges" is processed
using (up to) the specified number of threads, such that thread
i performs operations only on the samples included in
ranges[i,:,:]. Most thread assignment routines in this module
will return a list of two RangesMatrix objects,
[ranges_parallel, ranges_serial]. The first item represents the
part of the computation that can be done in parallel, and has
shape (n_threads, n_dets, n_samps). The ranges_serial object
has shape (1, n_dets, n_samps) and represents any samples that
need to be treated in a single thread. The ranges_serial is
only non-trivial when interpolation is active.
"""
def __init__(self):
raise NotImplementedError("Use child class Projectionist or ProjectionistHealpix")
def _get_pixelizor_args(self):
raise NotImplementedError("Use child class Projectionist or ProjectionistHealpix")
[docs]
def get_ProjEng(self, comps='TQU', proj_name=None, get=True,
instance=True, interpol=None):
"""Returns an so3g.ProjEng object appropriate for use with the
configured geometry.
"""
if proj_name is None: proj_name = self.proj_name
tile_suffix = 'Tiled' if self.tiling else 'NonTiled'
# Interpolation mode
if interpol is None:
interpol = self.interpol
if interpol in ["nn", "nearest"]:
interpol_suffix = ""
elif interpol in ["lin", "bilinear"]:
interpol_suffix = "_Bilinear"
else:
raise ValueError("ProjEng interpol '%s' not recognized" % str(interpol))
projeng_name = f'ProjEng_{proj_name}_{comps}_{tile_suffix}{interpol_suffix}'
if not get:
return projeng_name
try:
projeng_cls = getattr(so3g, projeng_name)
except AttributeError:
raise ValueError(f'There is no projector implemented for '
f'pixelization "{proj_name}", components '
f'"{comps}" (tried "{projeng_name}").')
if not instance:
return projeng_cls
return projeng_cls(self._get_pixelizor_args())
def _cache_q_fp_to_native(self, q_fp_to_celestial):
"""Get q_fp_to_native for argument q_fp_to_celestial, and cache the
result, and return that result later if called with the same
argument.
"""
if q_fp_to_celestial is not self._q_fp_to_celestial:
self._q_fp_to_native = self.q_celestial_to_native * q_fp_to_celestial
self._q_fp_to_celestial = q_fp_to_celestial
return self._q_fp_to_native
def _guess_comps(self, map_shape):
if map_shape[0] == 1:
return 'T'
elif map_shape[0] == 2:
return 'QU'
elif map_shape[0] == 3:
return 'TQU'
raise ValueError('No default components for ncomp=%i; '
'set comps=... explicitly.' % map_shape[0])
def _get_map_shape(self, imap):
"""
Get map shape, supporting "bare" tiled maps (lists of None or tile array).
For a list of tiles it returns the *tile shape*
"""
if isinstance(imap, list): # Assume tile list
for tile in imap:
if tile is not None:
return tile.shape
raise ValueError("map has no non-None entries")
else:
return imap.shape
[docs]
def get_pixels(self, assembly):
"""Get the pixel indices for the provided pointing Assembly. For each
detector, an int32 array of shape (n_samps, n_mapidx) is
returned. The value of the first slot of the second axis will
be -1 if and only if the sample's projected position is
outside the map footprint.
See class documentation for description of standard arguments.
"""
projeng = self.get_ProjEng('TQU')
q_native = self._cache_q_fp_to_native(assembly.Q)
return projeng.pixels(q_native, assembly.fplane.quats, None)
[docs]
def get_pointing_matrix(self, assembly):
"""Get the pointing matrix information, which is to say both the pixel
indices and the "spin projection factors" for the provided
pointing Assembly. Returns (pixel_indices, spin_proj) where
pixel_indices is as returned by get_pixels() and spin_proj is
a list of float arrays of shape [n_samps, n_comp]. As an
alternative to on-the-fly computation, this information can be
cached and used for very fast projection operations.
See class documentation for description of standard arguments.
"""
projeng = self.get_ProjEng('TQU')
q_native = self._cache_q_fp_to_native(assembly.Q)
return projeng.pointing_matrix(q_native, assembly.fplane.quats, assembly.fplane.resps, None, None)
[docs]
def get_coords(self, assembly, use_native=False, output=None):
"""Get the spherical coordinates for the provided pointing Assembly.
For each detector, a float64 array of shape [n_samps,4] is
returned. In the right-most index, the first two components
are the longitude and latitude in radians. The next two
components are the cosine and sine of the parallactic angle.
This routine uses a trivial CAR projection to return results
from the celestial coordinate system (i.e. (lon,lat) =
(RA,dec)), and the parallactic angle will be relative to that
projection. If you are interested in the parallactic angle of
the native spherical coordinate system (e.g. if you're doing a
tangent plane projection), make a second call specifying
use_native=True. In this case you might also take a look at
the get_planar() routine.
See class documentation for description of standard arguments.
"""
projeng = self.get_ProjEng('TQU', 'CAR')
if use_native:
q_native = self._cache_q_fp_to_native(assembly.Q)
else:
q_native = assembly.Q
return projeng.coords(q_native, assembly.fplane.quats, output)
[docs]
def get_planar(self, assembly, output=None):
"""Get projection plane coordinates for all detectors at all times.
For each detector, a float64 array of shape [n_samps,4] is
returned. The first two elements are the x and y projection
plane coordiantes, similar to the "intermediate world
coordinates", in FITS language. Insofar as FITS ICW has units
of degrees, these coordinates have units of radians. Indices
2 and 3 carry the cosine and sine of the detector parallactic
angle.
"""
q_native = self._cache_q_fp_to_native(assembly.Q)
projeng = self.get_ProjEng('TQU')
return projeng.coords(q_native, assembly.fplane.quats, output)
[docs]
def zeros(self, super_shape):
"""Return a map, filled with zeros, with shape (super_shape,) +
self.shape. For tiled maps, this will be a list of map tiles
(with shape (super_shape, ) + tile_shape. If any tiles are
not active, None is returned in the corresponding slots.
"""
projeng = self.get_ProjEng('T')
return projeng.zeros(super_shape)
[docs]
def to_map(self, signal, assembly, output=None, det_weights=None,
threads=None, comps=None):
"""Project signal into a map.
Arguments:
signal (Signal-like): The signal to project.
output (Map-like): The map into which to accumulate the
projected signal. If None, a map will be initialized internally.
det_weights (shape n_det): Weights to use for each detector.
See class documentation for description of standard arguments.
"""
if output is None and comps is None:
raise ValueError("Provide an output map or specify component of "
"interest (e.g. comps='TQU').")
if comps is None:
comps = self._guess_comps(self._get_map_shape(output))
projeng = self.get_ProjEng(comps)
q_native = self._cache_q_fp_to_native(assembly.Q)
map_out = projeng.to_map(output, q_native, assembly.fplane.quats,
assembly.fplane.resps, signal, det_weights, threads)
return map_out
[docs]
def to_weights(self, assembly, output=None, det_weights=None,
threads=None, comps=None):
"""Project pointing into a weights map.
Arguments:
output (Map-like): The weights map into which to accumulate
the projected signal. If None, a map will be initialized
internally.
det_weights (shape n_det): Weights to use for each detector.
See class documentation for description of standard arguments.
"""
if output is None and comps is None:
raise ValueError("Provide an output map or specify component of "
"interest (e.g. comps='TQU').")
if comps is None:
shape = self._get_map_shape(output)
assert(shape[0] == shape[1])
comps = self._guess_comps(shape[1:])
projeng = self.get_ProjEng(comps)
q_native = self._cache_q_fp_to_native(assembly.Q)
map_out = projeng.to_weight_map(output, q_native, assembly.fplane.quats,
assembly.fplane.resps, det_weights, threads)
return map_out
[docs]
def from_map(self, src_map, assembly, signal=None, comps=None):
"""De-project from a map, returning a Signal-like object.
Arguments:
src_map (Map-like): The map from which to sample.
signal (Signal-like): The object into which to accumulate
the signal. If not provided, a suitable object will be
created and initialized to zero.
See class documentation for description of standard arguments.
"""
if comps is None:
comps = self._guess_comps(self._get_map_shape(src_map))
projeng = self.get_ProjEng(comps)
q_native = self._cache_q_fp_to_native(assembly.Q)
signal_out = projeng.from_map(src_map, q_native, assembly.fplane.quats,
assembly.fplane.resps, signal)
return signal_out
[docs]
def assign_threads(self, assembly, method='domdir', n_threads=None):
"""Get a thread assignment RangesMatrix. Different algorithms can be
chosen, though this method does not provide fine-grained
control of algorithm parameters and instead seeks to apply
reasonable defaults.
The methods exposed here are:
- ``'simple'``: Divides the map into n_threads horizontal
bands, and assigns the pixels in each band to a single
thread. This tends to be bad for scans that aren't
horizontally correlated, or that don't have equal weight
throughout the map. The 'domdir' algorithm addresses both
of those problems.
- ``'domdir'``: For constant elevation scans, determines the
dominant scan direction in the map and divides it into bands
parallel to that scan direction. The thickness of bands is
adjusted so roughly equal numbers of samples fall into each
band.
- ``'tiles'``: In Tiled projections, assign each tile to a
thread. Some effort is made to balance the total number of
samples over the threads.
The returned object can be passed to the ``threads`` argument
of the projection methods in this class.
See class documentation for description of standard arguments.
Args:
method (str): Algorithm to use.
"""
if method not in THREAD_ASSIGNMENT_METHODS:
raise ValueError(f'No thread assignment method "{method}"; '
f'expected one of {THREAD_ASSIGNMENT_METHODS}')
n_threads = mapthreads.get_num_threads(n_threads)
if method == 'simple':
projeng = self.get_ProjEng('T')
q_native = self._cache_q_fp_to_native(assembly.Q)
omp_ivals = projeng.pixel_ranges(q_native, assembly.fplane.quats, None, n_threads)
return wrap_ivals(omp_ivals)
elif method == 'domdir':
fplane_rep = assembly.fplane[::100]
if (self.tiling is not None) and (self.active_tiles is None):
tile_info = self.get_active_tiles(assembly)
active_tiles = tile_info['active_tiles']
self.active_tiles = active_tiles
else:
active_tiles = None
return mapthreads.get_threads_domdir(
assembly.Q, assembly.fplane, shape=self.naxis[::-1], wcs=self.wcs,
tile_shape=self.tile_shape, active_tiles=active_tiles,
n_threads=n_threads, fplane_rep=fplane_rep)
elif method == 'tiles':
tile_info = self.get_active_tiles(assembly, assign=n_threads)
self.active_tiles = tile_info['active_tiles']
return tile_info['group_ranges']
raise RuntimeError(f"Unimplemented method: {method}.")
[docs]
def assign_threads_from_map(self, assembly, tmap, n_threads=None):
"""Assign threads based on a map.
The returned object can be passed to the ``threads`` argument
of the projection methods in this object.
See class documentation for description of standard arguments.
Args:
tmap: Map structure with shape (1,m,n) where each pixel,
when converted to integer, species the index of the thread
to which that pixel should be assigned.
"""
projeng = self.get_ProjEng('T')
q_native = self._cache_q_fp_to_native(assembly.Q)
n_threads = mapthreads.get_num_threads(n_threads)
omp_ivals = projeng.pixel_ranges(q_native, assembly.fplane.quats, tmap, n_threads)
return wrap_ivals(omp_ivals)
[docs]
def get_active_tiles(self, assembly, assign=False):
"""For a tiled Projection, figure out what tiles are hit by an
assembly.
See class documentation for description of standard arguments.
Args:
assign (bool or int): If True or an int, then the function
will also compute a thread assignment, based on assigning
each tile to a particular thread. If this is an int, it
sets the thread count; if it is simply True then
OMP_NUM_THREADS is used.
Returns:
dict : The basic analysis results are:
- ``'active_tiles'`` (list of int): sorted, non-repeating
list of tiles that are hit by the assembly.
- ``'hit_counts'`` (list of int): the number of hits in
each tile returned in 'active_tiles', respectively.
If the thread assignment took place then the dict will also
contain:
- ``'group_tiles'`` (list of lists of ints): There is one
entry per thread, and the entry lists the tiles that
have been assigned to that thread.
- ``'group_ranges'`` (RangesMatrix): The thread
assignments; shape (n_thread, n_det, n_samp).
"""
if self.tiling is None:
raise RuntimeError("This Projectionist not set up for Tiled maps.")
projeng = self.get_ProjEng('T')
q_native = self._cache_q_fp_to_native(assembly.Q)
# This returns a G3VectorInt of length n_tiles giving count of hits per tile.
hits = np.array(projeng.tile_hits(q_native, assembly.fplane.quats))
tiles = np.nonzero(hits)[0]
hits = hits[tiles]
if assign is True:
assign = so3g.useful_info()['omp_num_threads']
if assign > 0:
group_n = np.array([0 for g in range(assign)])
group_tiles = [[] for _ in group_n]
cands = sorted(list(zip(hits, tiles)), reverse=True) # [(1000, 12), (900, 4), ...]
for _n, _t in cands:
idx = group_n.argmin()
group_n[idx] += _n
group_tiles[idx].append(_t)
imax = np.argmax(group_n)
# max_ratio = group_n[imax] / np.mean(np.concatenate([group_n[:imax], group_n[imax+1:]]))
# if len(group_n)>1 and max_ratio > 1.1:
# print(f"Warning: Threads poorly balanced. Max/mean hits = {max_ratio}")
# Now paint them into Ranges.
R = projeng.tile_ranges(q_native, assembly.fplane.quats, group_tiles)
R = wrap_ivals(R)
return {
'active_tiles': list(tiles),
'hit_counts': list(hits),
'group_ranges': R,
'group_tiles': group_tiles,
}
return {
'active_tiles': list(tiles),
'hit_counts': list(hits),
}
_ivals_format = 2
[docs]
class Projectionist(_ProjectionistBase):
"""This class assists with analyzing WCS information to populate data
structures needed for accelerated pointing routines for rectangular
pixelization.
See also the methods and parameters defined in the base class
:class:`_ProjectionistBase`.
As in pixell, the code and discussion here uses the term
"geometry" to refer to the combination of an astropy.WCS object
and a 2-d array shape.
Some common method parameters specific to rectangular pixelization
are documented here for consistency.
* ``proj_name`` - a string specifying a projection. The
nomenclature is mostly the same as the FITS CTYPE identifiers.
Accepted values: ARC, CAR, CEA, TAN, ZEA, Flat, Quat.
* ``shape`` - the shape describing the celestial axes of the map.
(For tiled representation this specifies the parent footprint.)
* ``wcs`` - the WCS describing the celestial axes of the map.
Together with ``shape`` this is a geometry; see pixell.enmap
documentation.
* ``interpol``: How positions that fall between pixel centers will
be handled. Options are "nearest" (default): Use Nearest
Neighbor interpolation, so a sample takes the value of
whatever pixel is closest; or "bilinear": linearly
interpolate between the four closest pixels. bilinear is
slower (around 60%) but avoids problems caused by a
discontinuous model.
Attributes:
naxis: 2-element integer array specifying the map shape (for
the 2 celestial axes).
cdelt: 2-element float array specifying the pixel pitch.
crpix: 2-element float array specifying the pixel coordinates
of the reference point.
proj_name: string, name of the projection.
q_celestial_to_native: quaternion rotation taking celestial
coordinates to the native spherical coordinates of the
projection.
tile_shape: 2-element integer array specifying the tile shape
(this is the shape of one full-size sub-tile, not the
decimation factor on each axis).
tiling: a _Tiling object if the map is tiled, None otherwise.
"""
[docs]
@staticmethod
def get_q(wcs):
"""Analyze a wcs object to compute the quaternion rotation from
celestial to native spherical coordinates.
"""
alpha0, delta0 = wcs.wcs.crval # In degrees.
if (wcs.wcs.phi0 == 0. and wcs.wcs.theta0 == 0.):
# This is typical for cylindrical projections.
assert((delta0 >= 0 and wcs.wcs.lonpole == 180.0) or
(delta0 <= 0 and wcs.wcs.lonpole == 0.0))
Q = (quat.euler(1, delta0 * quat.DEG) *
quat.euler(2, -alpha0 * quat.DEG))
elif (wcs.wcs.phi0 == 0. and wcs.wcs.theta0 == 90.):
# This is typical for zenithal projections.
assert(wcs.wcs.lonpole == 180.0)
Q = (quat.euler(2, np.pi) *
quat.euler(1, (delta0 - 90)*quat.DEG) *
quat.euler(2, -alpha0 * quat.DEG))
else:
raise ValueError(f'Unimplemented NSC reference (phi0,theta0)='
'({wcs.wcs.phi0:.2f},{wcs.wcs.theta0:.2f})')
return Q
def __init__(self):
self._q_fp_to_native = None
self._q_fp_to_celestial = None
self.tile_shape = None
self.active_tiles = None
self.wcs = None
self.proj_name = None
self.q_celestial_to_native = None
self.interpol = INTERPOLS[0]
self.naxis = np.array([0, 0])
self.cdelt = np.array([0., 0.])
self.crpix = np.array([0., 0.])
@property
def tiling(self):
if self.tile_shape is None:
return None
return _Tiling(self.naxis[::-1], self.tile_shape)
[docs]
@classmethod
def for_geom(cls, shape, wcs, interpol=None):
"""Construct a Projectionist for use with the specified "geometry".
The shape and wcs are the core information required to prepare
a Projectionist, so this method is likely to be called by
other constructors.
"""
self = cls()
ax1, ax2 = wcs.wcs.lng, wcs.wcs.lat
ndim = len(shape)
# The axes are numbered from outside in...
self.naxis = np.array([shape[ndim - ax1 - 1],
shape[ndim - ax2 - 1]], dtype=int)
# Get just the celestial part.
wcs = wcs.celestial
self.wcs = wcs
# Extract the projection name (e.g. CAR)
proj = [c[-3:] for c in wcs.wcs.ctype]
assert(proj[0] == proj[1])
proj_name = proj[0] # Projection name
self.proj_name = proj_name
# Store the rotation to native spherical coordinates.
self.q_celestial_to_native = self.get_q(wcs)
# Store the grid info.
self.cdelt = np.array(wcs.wcs.cdelt) * quat.DEG
self.crpix = np.array(wcs.wcs.crpix)
# Pixel interpolation mode
if interpol is None:
interpol = INTERPOLS[0]
self.interpol = interpol
return self
[docs]
@classmethod
def for_map(cls, emap, wcs=None, interpol=None):
"""Construct a Projectionist for use with maps having the same
geometry as the provided enmap.
Args:
emap: enmap from which to extract shape and wcs information.
It is acceptable to pass a bare ndarray here (or anything
with shape attribute), provided that wcs is provided
separately.
wcs: optional WCS object to use instead of emap.wcs.
"""
if wcs is None:
wcs = emap.wcs
return cls.for_geom(emap.shape, wcs, interpol=interpol)
[docs]
@classmethod
def for_source_at(cls, alpha0, delta0, gamma0=0.,
proj_name='TAN'):
"""Return a pointing-only Projectionist where some particular
equatorial position will be put at the North Pole of the
Native spherical coordinates.
"""
self = cls()
self.proj_name = proj_name
assert(gamma0 == 0.)
self.q_celestial_to_native = (
quat.euler(2, np.pi)
* quat.euler(1, (delta0 - 90)*quat.DEG)
* quat.euler(2, -alpha0 * quat.DEG))
return self
[docs]
@classmethod
def for_tiled(cls, shape, wcs, tile_shape, active_tiles=True, interpol=None):
"""Construct a Projectionist for use with the specified geometry
(shape, wcs), cut into tiles of shape tile_shape.
See class documentation for description of standard arguments.
Args:
tile_shape: tuple of ints, giving the shape of each tile.
active_tiles: bool or list of ints. Specifies which tiles
should be considered active. If True, all tiles are
populated. If None or False, this will remain
uninitialized and attempts to generate blank maps
(such as calls to zeros or to projection functions
without a target map set) will fail. Otherwise this
must be a list of integers specifying which tiles to
populate on such calls.
"""
self = cls.for_geom(shape, wcs, interpol=interpol)
self.tile_shape = np.array(tile_shape, 'int')
if active_tiles is True:
self.active_tiles = list(range(self.tiling.tile_count))
elif active_tiles in [False, None]:
self.active_tiles = None
else:
# Presumably a list of tile numbers.
self.active_tiles = active_tiles
return self
[docs]
def from_map(self, src_map, assembly, signal=None, comps=None):
"""De-project from a map, returning a Signal-like object.
See parent class documentation for full description.
"""
if src_map.ndim == 2:
# Promote to (1,...)
src_map = src_map[None]
return super().from_map(src_map, assembly, signal, comps)
def _get_pixelizor_args(self):
"""Returns a tuple of arguments that may be passed to the ProjEng
constructor to define the pixelization.
"""
# All these casts are required because boost-python doesn't
# like numpy scalars.
args = (int(self.naxis[1]), int(self.naxis[0]),
float(self.cdelt[1]), float(self.cdelt[0]),
float(self.crpix[1]), float(self.crpix[0]))
if self.tiling:
args += tuple(map(int, self.tile_shape))
if self.active_tiles is not None:
args += (self.active_tiles,)
return args
def _guess_comps(self, map_shape):
if len(map_shape) != 3:
raise ValueError('Cannot guess components based on '
'map with %i!=3 dimensions!' % len(map_shape))
return super()._guess_comps(map_shape)
[docs]
class ProjectionistHealpix(_ProjectionistBase):
"""Projectionist for Healpix maps.
See base class :class:`_ProjectionistBase` for more methods
and explanation of common method parameters.
Attributes:
nside: int, nside of the map, power of 2; 0 < nside <= 8192.
nside_tile: None, int, or 'auto', nside of tiling.
Ntile will be 12*nside_tile**2. None for untiled.
If 'auto', an appropriate nside_tile will be computed
and set on calling :func:`compute_nside_tile`.
ordering: str, 'NEST' or 'RING'. Only NEST supported for tiled maps.
"""
def __init__(self):
self._q_fp_to_native = None
self._q_fp_to_celestial = None
self.active_tiles = None
self.proj_name = None
self.q_celestial_to_native = quat.quat(1,0,0,0)
self.interpol = 'nearest'
self.tiling = None
self.nside = None
self.nside_tile = None
self.ordering = 'NEST' # 'RING' or 'NEST'. Only NEST can be used with tiles
[docs]
@classmethod
def for_healpix(cls, nside, nside_tile=None, active_tiles=None, ordering='NEST', interpol=None):
"""Construct a Projectionist for Healpix maps.
See class documentation for description of standard arguments.
"""
self=cls()
self.proj_name = 'HP'
self.nside = nside
self.active_tiles = active_tiles
if interpol is not None and (interpol not in ['nearest', 'nn']):
raise NotImplementedError("Only 'nearest' interpolation is supported for Healpix")
self.interpol = 'nearest'
self.ordering = ordering
if ordering not in ['NEST', 'RING']:
raise ValueError("ordering {ordering} should be 'NEST' or 'RING'")
if nside_tile is not None:
self.nside_tile = nside_tile
self.tiling = True
if ordering == 'RING' and self.nside_tile is not None:
raise NotImplementedError("'RING' not supported for tiled maps")
return self
[docs]
def compute_nside_tile(self, assembly, nActivePerThread=5, nThreads=None):
"""Automatically compute and set self.nside_tile for good balancing over threads.
Arguments:
nActivePerThread: int, how many active pixels do you want per thread (minimum)
nThreads: int, number of threads to optimize for (takes from OMP_NUM_THREADS if None)
"""
if self.nside_tile == 'auto':
## Estimate fsky
nside_tile0 = min(4, self.nside) # ntile = 192, for estimating fsky
self.nside_tile = nside_tile0
nActive = len(self.get_active_tiles(assembly)['active_tiles'])
fsky = nActive / (12 * nside_tile0**2)
if nThreads is None:
nThreads = so3g.useful_info()['omp_num_threads']
# nside_tile is smallest power of 2 satisfying nTile >= nActivePerThread * nthread / fsky
self.nside_tile = int(2**np.ceil(0.5 * np.log2(nActivePerThread * nThreads / (12 * fsky))))
self.nside_tile = min(self.nside_tile, self.nside)
return self.nside_tile
[docs]
def get_active_tiles(self, assembly, assign=False):
"""For a tiled Projection, figure out what tiles are hit by an
assembly. See parent class documentation for full description.
"""
self.compute_nside_tile(assembly) # Set nside_tile if 'auto'
return super().get_active_tiles(assembly, assign)
[docs]
def get_coords(self, assembly, use_native=False, output=None):
"""Get the spherical coordinates for the provided pointing Assembly.
See parent class documentation for full description.
"""
projeng = self.get_ProjEng('TQU')
if use_native:
q_native = self._cache_q_fp_to_native(assembly.Q)
else:
q_native = assembly.Q
return projeng.coords(q_native, assembly.fplane.quats, output)
[docs]
def from_map(self, src_map, assembly, signal=None, comps=None):
"""De-project from a map, returning a Signal-like object.
See parent class documentation for full description.
"""
if self.nside_tile is None and src_map.ndim == 1:
# Promote to (1,...)
src_map = src_map[None]
elif self.nside_tile is not None:
if not isinstance(src_map, list):
raise TypeError(f"Expected list for tiled map; src_map is {type(src_map)}")
shape = self._get_map_shape(src_map)
if len(shape) == 1:
# Promote to (1,...)
for itile in range(len(src_map)):
if src_map[itile] is not None:
src_map[itile] = src_map[itile][None]
return super().from_map(src_map, assembly, signal, comps)
[docs]
def assign_threads(self, assembly, method=None, n_threads=None):
"""Get a thread assignment RangesMatrix.
Available methods are ``'simple'`` and ``'tiles'``.
See parent class documentation for full description.
"""
if method is None:
if self.nside_tile is None:
method = 'simple'
else:
method = 'tiles'
if (method not in THREAD_ASSIGNMENT_METHODS_HP) and \
(method in THREAD_ASSIGNMENT_METHODS):
raise ValueError(f'Thread assignment method "{method}" '
'not supported for ProjectionistHealpix. '
f'Expected one of {THREAD_ASSIGNMENT_METHODS_HP}.')
return super().assign_threads(assembly, method, n_threads)
def _get_pixelizor_args(self):
"""Returns a tuple of arguments that may be passed to the ProjEng
constructor to define the pixelization.
"""
if self.active_tiles is not None:
active_tiles = list(map(int, self.active_tiles))
else:
active_tiles = None
nside_tile = None
if self.nside_tile is not None:
nside_tile = int(self.nside_tile)
args = (int(self.nside),
int(self.ordering == 'NEST'),
nside_tile,
active_tiles)
return args
def _guess_comps(self, map_shape):
if len(map_shape) != 2:
raise ValueError('Cannot guess components based on '
'map with %i!=2 dimensions!' % len(map_shape))
return super()._guess_comps(map_shape)
class _Tiling:
"""Utility class for computations involving tiled maps.
"""
def __init__(self, shape, tile_shape):
self.shape = shape[-2:]
self.tile_shape = tile_shape
nt0 = int(np.ceil(self.shape[0] / self.tile_shape[0]))
nt1 = int(np.ceil(self.shape[1] / self.tile_shape[1]))
self.super_shape = (nt0, nt1)
self.tile_count = nt0 * nt1
def tile_dims(self, tile):
rowcol = self.tile_rowcol(tile)
return (min(self.shape[0] - rowcol[0] * self.tile_shape[0], self.tile_shape[0]),
min(self.shape[1] - rowcol[1] * self.tile_shape[1], self.tile_shape[1]))
def tile_rowcol(self, tile):
if tile >= self.tile_count:
raise ValueError(f"Request for tile {tile} is out of bounds for "
f"super_shape={self.super_shape}")
return (tile // self.super_shape[1], tile % self.super_shape[1])
def tile_offset(self, tile):
row, col = self.tile_rowcol(tile)
return row * self.tile_shape[0], col * self.tile_shape[1]
def wrap_ivals(ivals):
"""Thread computation routines at C++ level return nested lists of
Ranges objects; i.e. something like this::
ivals = [
[ # thread assignments for first "bunch"
[Ranges, Ranges, ... ], # for thread 0
[Ranges, Ranges, ... ],
...
[Ranges, Ranges, ... ], # for thread n-1.
],
[ # thread assignments for second "bunch"
[Ranges, Ranges, ... ], # for thread 0
],
]
This function wraps and returns each highest level entry into a
RangesMatrix, i.e.::
wrapped = [
RangesMatrix(n_threads1, n_det, n_samp),
RangesMatrix(n_threads2, n_det, n_samp),
]
Currently all use cases have len(ivals) == 2 and n_threads2 = 1
but the scheme is more general than that.
"""
return [RangesMatrix([RangesMatrix(y) for y in x]) for x in ivals]
THREAD_ASSIGNMENT_METHODS = [
'simple',
'domdir',
'tiles',
]
THREAD_ASSIGNMENT_METHODS_HP = [
'simple',
'tiles'
]