Source code for triumvirate.catalogue

"""
Catalogue (:mod:`~triumvirate.catalogue`)
=========================================

Handle catalogue I/O and processing.

.. autosummary::
    MissingValueError
    DefaultValueWarning
    ParticleCatalogue

"""  # numpydoc ignore=SS01
import warnings

import numpy as np
from astropy.table import Table

from ._particles import _ParticleCatalogue

try:
    from nbodykit.source.catalog import (
        ArrayCatalog,
        BinaryCatalog,
        CSVCatalog,
        # FITSCatalog,
        HDFCatalog,
    )
    _nbkt_imported = True
except Exception:
    _nbkt_imported = False


[docs] class MissingValueError(ValueError): """Value error raised when a mandatory field is missing/empty in a catalogue. """
[docs] class DefaultValueWarning(UserWarning): """Warning issued when values of a field are not provided and set to the default. """
[docs] class ParticleCatalogue: """Catalogue holding particle coordinates, weights and redshift-dependent mean number density. Parameters ---------- x, y, z : 1-d array of float Cartesian coordinates of particles. `x`, `y` and `z` must have the same length. nz : (1-d array of) float, optional Redshift-dependent mean number density (defaults is `None`). If an array, it must be of the same length as `x`, `y` and `z`. This quantity should include sample weighting if sample weights are used; see the :ref:`note <sample_and_clustering_weights>` below for more details. ws, wc : (1-d array of) float, optional Sample weight and clustering weight of particles (defaults are 1.). If an array, it must be of the same length as `x`, `y` and `z`. See the :ref:`note <sample_and_clustering_weights>` below for more details. logger : :class:`logging.Logger`, optional Program logger (default is `None`). Attributes ---------- bounds : dict of {str: tuple of (float, float)} Particle coordinate bounds. ntotal : int Total particle number. wtotal : float Total particle overall weight. wstotal : float Total particle sample weight. .. versionchanged:: 0.2.0 Separate `wtotal` and `wstotal` attributes. .. _sample_and_clustering_weights: .. admonition:: Sample and clustering weights There are two types of weights: sample weight ``ws`` (e.g. completeness weights) and clustering weight ``wc`` (e.g. Feldman--Kaiser--Peacock weights). The overall weight is the product of the two for each particle. Note the naming convention above: in particular, ``wc`` is not the completeness weight (which is a component of ``ws`` instead). When sample weights are applied (e.g. the catalogue has been corrected for incompleteness), the redshift-dependent mean number density ``nz`` is the weighted mean number density (e.g. including the completeness weights). """ def __init__(self, x, y, z, nz=None, ws=1., wc=1., logger=None): self._logger = logger # Initialise particle data (with 'astropy'). if nz is None: warnings.warn( "Catalogue 'nz' field is None and thus set to zero, " "which may raise errors in some computations.", category=DefaultValueWarning ) nz = 0. self._pdata = Table() self._pdata['x'] = x self._pdata['y'] = y self._pdata['z'] = z self._pdata['nz'] = nz self._pdata['ws'] = ws self._pdata['wc'] = wc if _nbkt_imported: self._pdata = ArrayCatalog(self._pdata) self._backend = 'nbodykit' else: self._backend = 'astropy' self._source = f'extdata:{id(self._pdata)}' # Compute catalogue properties. self._calc_bounds(init=True) self.ntotal = len(self) self.wtotal = self._compute( (self._pdata['ws'] * self._pdata['wc']).sum() ) self.wstotal = self._compute(self._pdata['ws'].sum()) if self._logger: self._logger.info( "Catalogue initialised: " "ntotal = %d, wtotal = %.3f, wstotal = %.3f (%s).", self.ntotal, self.wtotal, self.wstotal, self )
[docs] @classmethod def read_from_file(cls, filepath, reader='astropy', names=None, format='ascii.no_header', name_mapping=None, table_kwargs=None, file_kwargs=None, logger=None): """Read particle data from file. Parameters ---------- filepath : str or :class:`pathlib.Path` Catalogue file path. reader : {'astropy', 'nbodykit'}, optional If 'astropy' (default), |Table| is used for reading in the catalogue file; else if 'nbodykit', child classes of |FileCatalogBase| is used (if :mod:`nbodykit` is available). names : sequence of str, list of tuple or :class:`numpy.dtype`, \ optional Catalogue file field names or data types. If `None` (default), the header in the file may be used to infer the field names or data types. See the :ref:`note <format_and_names_arguments>` below for more details. format : str, optional File format specifier (default is 'ascii.no_header' for default `reader` value 'astropy'). See the :ref:`note <format_and_names_arguments>` below for more details. name_mapping : dict of {str: str}, optional Mapping between any of the default column names 'x', 'y', 'z', 'nz', 'ws' and 'wc' (keys) and the corresponding field names (values) in `names` (default is `None`). table_kwargs : dict, optional Keyword arguments to be passed to :attr:`astropy.table.Table.read` (default is `None`). Used only when ``reader='astropy'``. file_kwargs : dict, optional Keyword arguments to be passed to child classes of :class:`nbodykit.source.catalog.file.FileCatalogBase` (default is `None`). Used only when ``reader='nbodykit'``. See also the hint below. logger : :class:`logging.Logger`, optional Program logger (default is `None`). Returns ------- :class:`~triumvirate.catalogue.ParticleCatalogue` Particle catalogue object. .. _format_and_names_arguments: .. admonition:: `format` and `names` arguments For ``reader='astropy'``, supported `format` can be found in `'Built-In Table Readers/Writers'`_, and `names` correspond to the ``names`` keyword argument (sequence of str) in :attr:`astropy.table.Table.read` for a subset of formats. See |Table| for more details including appropriate `table_kwargs`. For ``reader='nbodykit'``, supported `format` and the argument corresponding to `names` in the reader are--- - ``'text'`` and ``names`` (sequence of str): plain-text files read in by |CSVCatalog|; - ``'binary'`` and ``dtype`` (list of tuple or :class:`numpy.dtype`): binary files read in by |BinaryCatalog|; - ``'hdf5'`` and not applicable: HDF files read in by |HDFCatalog| (pass `file_kwargs` as appropriate). See `'Reading Catalogs from Disk'`_ for more details. .. hint:: If any of the 'nz', 'ws' and 'wc' columns are not provided, these columns are initialised with the default values in :class:`~triumvirate.catalogue.ParticleCatalogue`. Examples -------- >>> ParticleCatalogue.read_from_file( ... filepath, ... reader='astropy', ... format='fits', ... # Match original data column names to the default names. ... name_mapping={ ... 'x': 'X', 'y': 'Y', 'z': 'Z', ... 'nz': 'NZ', 'ws': 'WEIGHT_SYS', 'wc': 'WEIGHT_FKP', ... } ... ) >>> ParticleCatalogue.read_from_file( ... filepath, ... reader='nbodykit', ... format='hdf5', ... # `root` keyword argument is passed to `HDFCatalog`. ... file_kwargs={root='particles'} ... ) .. |Table| replace:: :class:`astropy.table.Table` .. |FileCatalogBase| replace:: \ :class:`nbodykit.source.catalog.file.FileCatalogBase` .. |CSVCatalog| replace:: \ :class:`nbodykit.source.catalog.file.CSVCatalog` .. |FITSCatalog| replace:: \ :class:`nbodykit.source.catalog.file.FITSCatalog` .. |BinaryCatalog| replace:: \ :class:`nbodykit.source.catalog.file.BinaryCatalog` .. |HDFCatalog| replace:: \ :class:`nbodykit.source.catalog.file.HDFCatalog` .. _'Built-In Table Readers/Writers': \ https://docs.astropy.org/en/latest/io/unified.html #built-in-table-readers-writers .. _'Reading Catalogs from Disk': \ https://nbodykit.readthedocs.io/en/latest/catalogs/reading.html """ # numpydoc ignore=RT03 self = object.__new__(cls) self._logger = logger self._source = f'extfile:{filepath}' if reader.lower() == 'nbodykit': if _nbkt_imported: self._backend = 'nbodykit' else: warnings.warn( "'astropy' is used for catalogue I/O as " "'nbodykit' is unavailable", category=RuntimeWarning ) self._backend = 'astropy' elif reader.lower() == 'astropy': self._backend = 'astropy' else: raise ValueError( f"Unsupported catalogue file reader: {reader}. " "Possible options: {'astropy', 'nbodykit'}." ) # Initialise particle data. if self._backend == 'nbodykit': if file_kwargs is None: file_kwargs = {} filepath = str(filepath) if format == 'text': self._pdata = CSVCatalog(filepath, names, **file_kwargs) elif format == 'fits': raise ValueError( "'nbodykit' FITS reader is broken. " "Use 'astropy' FITS reader instead." ) elif format == 'binary': self._pdata = \ BinaryCatalog(filepath, dtype=names, **file_kwargs) elif format == 'hdf5': self._pdata = HDFCatalog(filepath, **file_kwargs) else: raise ValueError( "Unsupported `format` for ``reader='nbodykit'``: {}." .format(filepath) ) if name_mapping: for name_, name_alt_ in name_mapping.items(): self._pdata[name_] = self._pdata[name_alt_] try: del self._pdata[name_alt_] except ValueError: pass if self._backend == 'astropy': if format == 'text': format = 'ascii' if table_kwargs is None: table_kwargs = {} if format.startswith('ascii'): table_kwargs.update(names=names) self._pdata = Table.read( filepath, format=format, **table_kwargs ) if name_mapping: for name_, name_alt_ in name_mapping.items(): self._pdata.rename_column(name_alt_, name_) # Validate particle data columns. if self._backend == 'nbodykit': colnames = self._pdata.columns if self._backend == 'astropy': colnames = self._pdata.colnames for axis_name in ['x', 'y', 'z']: if axis_name not in colnames: raise MissingValueError( f"Mandatory field {axis_name} is missing." ) if 'nz' not in colnames: self._pdata['nz'] = 0. warnings.warn( "Catalogue 'nz' field is not provided and thus set to zero, " "which may raise errors in some computations.", category=DefaultValueWarning ) for name_wgt in ['ws', 'wc']: if name_wgt not in colnames: warnings.warn( f"Catalogue '{name_wgt}' field is not provided, " "so is set to unity.", category=DefaultValueWarning ) self._pdata[name_wgt] = 1. # Compute catalogue properties. self._calc_bounds(init=True) self.ntotal = len(self._pdata) self.wtotal = self._compute( (self._pdata['ws'] * self._pdata['wc']).sum() ) self.wstotal = self._compute(self._pdata['ws'].sum()) if self._logger: self._logger.info( "Catalogue loaded: " "ntotal = %d, wtotal = %.3f, wstotal = %.3f (%s).", self.ntotal, self.wtotal, self.wstotal, self ) return self
def __str__(self): try: return f"ParticleCatalogue(source={self._source})" except NameError: return "ParticleCatalogue(address={})".format( object.__str__(self).split()[-1].lstrip().rstrip('>') ) def __len__(self): return len(self._pdata)
[docs] def __getitem__(self, key): """Return one or more data entries. Parameters ---------- key : (list of) str, (list of) int or slice Data entry key(s) or selectors (indices or slices). Returns ------- :class:`numpy.ndarray` Data entry/entries. """ return self._compute(self._pdata[key])
[docs] def __setitem__(self, key, val): """Set data column. Parameters ---------- key : str Data column name. val : 1-d array_like Data column array. """ if isinstance(key, str): self._pdata[key] = val else: raise ValueError('Cannot set values for non-string-type key.')
[docs] def compute_los(self): """Compute the line of sight to each particle. Returns ------- los : (N, 3) :class:`numpy.ndarray` Normalised line-of-sight vectors. """ los_norm = np.sqrt( self._pdata['x']**2 + self._pdata['y']**2 + self._pdata['z']**2 ) los_norm[los_norm == 0.] = 1. los = np.transpose([ self._pdata['x'] / los_norm, self._pdata['y'] / los_norm, self._pdata['z'] / los_norm ]) return self._compute(los)
[docs] def compute_mean_density(self, volume=None, boxsize=None): """Compute the mean density over a volume. This sets the 'nz' column to :attr:`ntotal` divided by `volume` or cubic (product of) `boxsize`, and is typically used for calculating the homogeneous background number density in a simulation box. Parameters ---------- volume : double, optional Volume over which the mean density is calculated. boxsize : float or sequence of [float, float, float], optional Box size (in each dimension) over which the mean density is calculated. Used only when `volume` is `None`. .. attention:: The invocation of this method resets the particle data column ``'nz'``. """ # numpydoc ignore=PR02,PR10 if np.isscalar(boxsize): boxsize = [boxsize, boxsize, boxsize] volume = volume or np.prod(boxsize) self._pdata['nz'] = self.ntotal / volume
[docs] def centre(self, boxsize, catalogue_ref=None): """Centre a (pair of) catalogue(s) in a box. Parameters ---------- boxsize : float or sequence of [float, float, float] Box size (in each dimension). catalogue_ref : :class:`~triumvirate.catalogue.ParticleCatalogue`, \ optional Reference catalogue used for box alignment, also to be centred in the same box. If `None` (default), the current catalogue itself is used as the reference catalogue. .. note:: The reference catalogue is typically the random-source catalogue (if provided). Particle coordinates in both catalogues are shifted by the same displacement vector such that the mid-point of particle coordinate extents in the reference catalogue is at the centre of the box. """ # numpydoc ignore=PR02,PR10 if np.isscalar(boxsize): boxsize = [boxsize, boxsize, boxsize] if catalogue_ref is None: catalogue_ref_ = self else: catalogue_ref_ = catalogue_ref _axes_overflow = self._check_bounds_in_boxsize(boxsize) if _axes_overflow: warnings.warn( "Catalogue extent exceeds the box size along axis {} ({})." "Some particles may lie outside the box after centring." .format(set(_axes_overflow), self) ) if catalogue_ref is not None: _axes_overflow_ref = \ catalogue_ref_._check_bounds_in_boxsize(boxsize) if _axes_overflow_ref: warnings.warn( "Catalogue extent exceeds the box size along axis {} ({})." "Some particles may lie outside the box after centring." .format(set(_axes_overflow_ref), catalogue_ref_) ) origin = np.array([ np.mean(catalogue_ref_.bounds[axis]) - boxsize[iaxis]/2. for iaxis, axis in enumerate(['x', 'y', 'z']) ]) self.offset_coords(origin) if catalogue_ref is not None: catalogue_ref.offset_coords(origin)
[docs] def pad(self, boxsize, ngrid=None, boxsize_pad=None, ngrid_pad=None, catalogue_ref=None): """Pad a (pair of) catalogue(s) in a box. The particle coordinates are shifted away from the box corner at the origin such that in each dimension the minimum particle coordinate is given by the amount of padding, which can be set as a fraction of the box size or a multiple of the grid cell size. Parameters ---------- boxsize : float or sequence of [float, float, float] Box size in each dimension. ngrid : int or sequence of [int, int, int], optional Grid cell number in each dimension (default is `None`). Must be provided if `ngrid_pad` is set. boxsize_pad : float or sequence of [float, float, float], optional Box size padding factor. If not `None` (default), then `ngrid_pad` must be `None`. ngrid_pad : float or sequence of [float, float, float], optional Grid padding factor. If not `None` (default), then `boxsize_pad` must be `None`. catalogue_ref : :class:`~triumvirate.catalogue.ParticleCatalogue`, \ optional Reference catalogue used for box alignment, also to be put in the same box. If `None` (default), the current catalogue itself is used as the reference catalogue. Raises ------ ValueError If `boxsize_pad` and `ngrid_pad` are both set to not `None`. .. note:: The reference catalogue is typically the random-source catalogue (if provided). Particle coordinates in both catalogues are shifted by the same displacement vector such that the minimum particle coordinates in the reference catalogue are the amounts of padding specified. """ if boxsize_pad is None and ngrid_pad is None: warnings.warn( "`boxsize_pad` and `ngrid_pad` are both None. " "No padding is applied to the catalogue." ) return if boxsize_pad is not None and ngrid_pad is not None: raise ValueError( "Conflicting padding as `boxsize_pad` and `ngrid_pad` " "are both set (not None)." ) if np.isscalar(boxsize): boxsize = [boxsize, boxsize, boxsize] if catalogue_ref is None: catalogue_ref_ = self else: catalogue_ref_ = catalogue_ref _axes_overflow = self._check_bounds_in_boxsize(boxsize) if _axes_overflow: warnings.warn( "Catalogue extent exceeds the box size along axis {} ({})." "Some particles may lie outside the box after padding." .format(set(_axes_overflow), self) ) if catalogue_ref is not None: _axes_overflow_ref = \ catalogue_ref_._check_bounds_in_boxsize(boxsize) if _axes_overflow_ref: warnings.warn( "Catalogue extent exceeds the box size along axis {} ({})." "Some particles may lie outside the box after padding." .format(set(_axes_overflow_ref), catalogue_ref_) ) origin = np.array([ catalogue_ref_.bounds[axis][0] for axis in ['x', 'y', 'z'] ]) if boxsize_pad: origin -= np.multiply(boxsize_pad, boxsize) if ngrid_pad: origin -= np.multiply(ngrid_pad, np.divide(boxsize, ngrid)) self.offset_coords(origin) if catalogue_ref is not None: catalogue_ref.offset_coords(origin) for iaxis, axis in enumerate(['x', 'y', 'z']): if (max(self.bounds[axis]) > boxsize[iaxis] or max(catalogue_ref_.bounds[axis]) > boxsize[iaxis]): warnings.warn( "`boxsize` is smaller than the particle extents " f"in {axis} axis. Some particles now lie outside " "the box after padding." )
[docs] def periodise(self, boxsize): """Place particles in a periodic box of given box size. Parameters ---------- boxsize : float or sequence of [float, float, float] Box size (in each dimension). """ if np.isscalar(boxsize): boxsize = [boxsize, boxsize, boxsize] _axes_overflow = self._check_bounds_in_boxsize(boxsize) if _axes_overflow: warnings.warn( "Box size is smaller than particle coordinate extents " "along axis: {}." .format(_axes_overflow) ) for iaxis, axis in enumerate(['x', 'y', 'z']): # Also centre. self._pdata[axis] = ( self._pdata[axis] + boxsize[iaxis] / 2. - ( self.bounds[axis][1] + self.bounds[axis][0] ) / 2. ) % boxsize[iaxis] self._calc_bounds()
[docs] def offset_coords(self, origin): """Offset particle coordinates for a given origin. Parameters ---------- origin : float or sequence of [float, float, float] Coordinates of the new origin. """ for axis, coord in zip(['x', 'y', 'z'], origin): self._pdata[axis] -= coord self._calc_bounds()
def _calc_bounds(self, init=False): """Calculate coordinate bounds in each dimension. Parameters ---------- init : bool, optional If `True` (default is `False`), particle positions are original and have not been offset previously. """ self.bounds = {} self.spans = {} for axis in ['x', 'y', 'z']: self.bounds[axis] = ( self._compute(self._pdata[axis].min()), self._compute(self._pdata[axis].max()) ) self.spans[axis] = self.bounds[axis][1] - self.bounds[axis][0] if self._logger: if init: self._logger.info( "Original extents of particle coordinates: " "{'x': (%.3f, %.3f | %.3f)," " 'y': (%.3f, %.3f | %.3f)," " 'z': (%.3f, %.3f | %.3f)}" " (%s).", *self.bounds['x'], self.spans['x'], *self.bounds['y'], self.spans['y'], *self.bounds['z'], self.spans['z'], self ) else: self._logger.info( "Offset extents of particle coordinates: " "{'x': (%.3f, %.3f | %.3f)," " 'y': (%.3f, %.3f | %.3f)," " 'z': (%.3f, %.3f | %.3f)}" " (%s).", *self.bounds['x'], self.spans['x'], *self.bounds['y'], self.spans['y'], *self.bounds['z'], self.spans['z'], self ) def _check_bounds_in_boxsize(self, boxsize): """Check if partice coordinate extents are covered by the box size in all dimensions, and return the axis name(s) if not. Parameters ---------- boxsize : sequence of float Box size in all dimensions. Returns ------- list of str Axis if and where box size is smaller than the particle coordinate extent. """ _axes = [] for iaxis, axis in enumerate(['x', 'y', 'z']): if np.abs(np.diff(self.bounds[axis])) > boxsize[iaxis]: _axes.append(axis) return _axes def _convert_to_cpp_catalogue(self, verbose=-1): """Convert to a C++-wrapped catalogue. Parameters ---------- verbose : int, optional Verbosity level to be passed to the backend C++ logger (default is -1, i.e. unused). Returns ------- :class:`~triumvirate._particles._ParticleCatalogue` C++-wrapped catalogue. """ x = np.ascontiguousarray(self._compute(self._pdata['x'])) y = np.ascontiguousarray(self._compute(self._pdata['y'])) z = np.ascontiguousarray(self._compute(self._pdata['z'])) nz = np.ascontiguousarray(self._compute(self._pdata['nz'])) ws = np.ascontiguousarray(self._compute(self._pdata['ws'])) wc = np.ascontiguousarray(self._compute(self._pdata['wc'])) return _ParticleCatalogue(x, y, z, nz, ws, wc, verbose=verbose) def _compute(self, quant): """Return a quantity in standard form (i.e. apply :meth:`dask.array.Array.compute` to any Dask array). Parameters ---------- quant Quantity. Returns ------- The same quantity in a non-Dask-array form. """ # numpydoc ignore=PR04,RT03 try: return quant.compute() except AttributeError: return quant
[docs] def write_attrs_as_header(self, catalogue_ref=None): """Write catalogue attributes as a header. Parameters ---------- catalogue_ref : :class:`~triumvirate.catalogue.ParticleCatalogue`, \ optional Reference catalogue (default is `None`) whose attributes are also written out. This is typically the random-source catalogue. Returns ------- text_header : str Catalogue attributes as a header string. """ if catalogue_ref is None: text_lines = [ "Catalogue source: {}" .format(self._source), # noqa: E131 "Catalogue size: " "ntotal = {:d}, wtotal = {:.3f}, wstotal = {:.3f}" .format(self.ntotal, self.wtotal, self.wstotal), "Catalogue particle extents: " "([{:.3f}, {:.3f}], [{:.3f}, {:.3f}], [{:.3f}, {:.3f}])" .format( *self.bounds['x'], *self.bounds['y'], *self.bounds['z'] ), ] else: text_lines = [ "Data catalogue source: {}" .format(self._source), # noqa: E131 "Data catalogue size: " "ntotal = {:d}, wtotal = {:.3f}, wstotal = {:.3f}" .format(self.ntotal, self.wtotal, self.wstotal), "Data-source particle extents: " "([{:.3f}, {:.3f}], [{:.3f}, {:.3f}], [{:.3f}, {:.3f}])" .format( *self.bounds['x'], *self.bounds['y'], *self.bounds['z'] ), "Random catalogue source: {}" .format(catalogue_ref._source), # noqa: E131 "Random catalogue size: " "ntotal = {:d}, wtotal = {:.3f}, wstotal = {:.3f}" .format( # noqa: E131 catalogue_ref.ntotal, catalogue_ref.wtotal, catalogue_ref.wstotal, ), "Random-source particle extents: " "([{:.3f}, {:.3f}], [{:.3f}, {:.3f}], [{:.3f}, {:.3f}])" .format( # noqa: E131 *catalogue_ref.bounds['x'], *catalogue_ref.bounds['y'], *catalogue_ref.bounds['z'] ), ] text_header = "\n".join(text_lines) return text_header