"""
Two-Point Clustering Statistics (:mod:`~triumvirate.twopt`)
===========================================================
Measure two-point clustering statistics from catalogues.
Local plane-parallel estimators
-------------------------------
.. autosummary::
compute_powspec
compute_corrfunc
Global plane-parallel estimators
--------------------------------
.. autosummary::
compute_powspec_in_gpp_box
compute_corrfunc_in_gpp_box
Window function estimator
-------------------------
.. autosummary::
compute_corrfunc_window
""" # numpydoc ignore=GL07,SS01
import warnings
from pathlib import Path
import numpy as np
from ._twopt import (
_calc_powspec_normalisation_from_mesh,
_calc_powspec_normalisation_from_meshes,
_calc_powspec_normalisation_from_particles,
_compute_corrfunc,
_compute_corrfunc_in_gpp_box,
_compute_corrfunc_window,
_compute_powspec,
_compute_powspec_in_gpp_box,
)
from .dataobjs import Binning
from .parameters import (
_modify_measurement_parameters,
_modify_sampling_parameters,
fetch_paramset_template,
ParameterSet,
)
# Use default parameters for mixed-mesh normalisation in `pypower`.
PADDING = .10
CELLSIZE = 10.
ASSIGNMENT = 'cic'
def _amalgamate_parameters(paramset=None, params_sampling=None,
degree=None, binning=None, **type_kwargs):
"""Amalgamate a parameter set by overriding sampling parameters and
the measured multipole degree and coordinate binning.
Parameters
----------
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Full parameter set. If `None` (default), the other parameters
must be provided.
params_sampling : dict, optional
Dictionary containing a subset of the following entries
for sampling parameters---
- 'alignment': {'centre', 'pad'};
- 'boxsize': sequence of [float, float, float];
- 'ngrid': sequence of [int, int, int];
- 'assignment': {'ngp', 'cic', 'tsc', 'pcs'};
- 'interlace': bool;
and exactly one of the following only when 'alignment' is 'pad'---
- 'boxpad': float;
- 'gridpad': float.
If not `None` (default), this will override the corresponding
entries in `paramset`.
degree : int, optional
Multipole degree. If not `None` (default), this will override
``paramset['degrees']['ELL']``.
binning : :class:`~.triumvirate.dataobjs.Binning`, optional
Binning (default is `None`).
**type_kwargs
'catalogue_type' and 'statistic_type' parameters to be filled in.
Returns
-------
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Amalgamated full parameter set.
Raises
------
ValueError
When `paramset` is `None` while `params_sampling`, `degree` or
`binning` is also `None`.
"""
if paramset is None and params_sampling is None:
raise ValueError(
"Either `paramset` or `sampling_params` must be provided."
)
if paramset is None and degree is None:
raise ValueError("Either `paramset` or `degree` must be provided.")
if paramset is None and binning is None:
raise ValueError("Either `paramset` or `binning` must be provided.")
if paramset is None:
paramset, defaults = fetch_paramset_template('dict', ret_defaults=True)
else:
# paramset = paramset
defaults = None
if params_sampling is not None:
paramset, defaults = _modify_sampling_parameters(
paramset, params_sampling,
params_default=defaults, ret_defaults=True
)
params_measure = {}
if degree is not None:
params_measure.update({'ELL': degree})
if binning is not None:
params_measure.update({
'bin_min': binning.bin_min,
'bin_max': binning.bin_max,
'num_bins': binning.num_bins,
})
paramset, defaults = _modify_measurement_parameters(
paramset, params_measure,
params_default=defaults, ret_defaults=True
)
# Implicitly validate parameters.
paramset.update(type_kwargs)
if defaults:
warnings.warn(
"The following parameter default values "
f"are unchanged: {defaults}. "
"Not all parameters are necessarily used."
)
return paramset
def _get_measurement_filename(paramset):
"""Get output measurement filename.
Parameters
----------
paramset : :class:`~triumvirate.parameters.ParameterSet`
Parameter set.
Returns
-------
str
Output measurement filename.
Raises
------
ValueError
When `paramset` indicates the measurements are not
two-point statistics.
"""
multipole = paramset['degrees']['ELL']
output_tag = paramset['tags']['output'] or ""
if paramset['statistic_type'] == 'powspec':
return "pk{:d}{}".format(multipole, output_tag)
if paramset['statistic_type'] == '2pcf':
return "xi{:d}{}".format(multipole, output_tag)
if paramset['statistic_type'] == '2pcf-win':
return "xiw{:d}{}".format(multipole, output_tag)
raise ValueError(
"`paramset` 'statistic_type' does not correspond to a "
"recognised two-point statistic."
)
def _print_measurement_header(paramset, norm_factor_part, norm_factor_mesh,
norm_factor_meshes):
"""Print two-point statistic measurement header including sampling
parameters, normalisation factors and data table columns.
Parameters
----------
paramset : :class:`~triumvirate.parameters.ParameterSet`
Parameter set.
norm_factor_part, norm_factor_mesh, norm_factor_meshes : float
Normalisation factors.
Returns
-------
text_header : str
Measurement information as a header string.
Raises
------
ValueError
When `paramset` indicates the measurements are not
two-point statistics.
"""
if paramset['norm_convention'] == 'none':
norm_factor = 1.
if paramset['norm_convention'] == 'particle':
norm_factor = norm_factor_part
if paramset['norm_convention'] == 'mesh':
norm_factor = norm_factor_mesh
if paramset['norm_convention'] == 'mesh-mixed':
norm_factor = norm_factor_meshes
if paramset['npoint'] != '2pt':
raise ValueError(
"`paramset` 'statistic_type' does not correspond to a "
"recognised two-point statistic."
)
if paramset['space'] == 'fourier':
datatab_colnames = [
"k_cen", "k_eff", "nmodes",
"Re{{pk{:d}_raw}}".format(paramset['degrees']['ELL']),
"Im{{pk{:d}_raw}}".format(paramset['degrees']['ELL']),
"Re{{pk{:d}_shot}}".format(paramset['degrees']['ELL']),
"Im{{pk{:d}_shot}}".format(paramset['degrees']['ELL'])
]
if paramset['space'] == 'config':
datatab_colnames = [
"r_cen", "r_eff", "npairs",
"Re{{xi{:d}}}".format(paramset['degrees']['ELL']),
"Im{{xi{:d}}}".format(paramset['degrees']['ELL'])
]
text_lines = [
"Box size: [{:.3f}, {:.3f}, {:.3f}]".format(
*[paramset['boxsize'][ax] for ax in ['x', 'y', 'z']]
),
"Box alignment: {}".format(paramset['alignment']),
"Mesh number: [{:d}, {:d}, {:d}]".format(
*[paramset['ngrid'][ax] for ax in ['x', 'y', 'z']]
),
"Mesh assignment and interlacing: {}, {}".format(
paramset['assignment'], paramset['interlace']
),
"Normalisation factor: {:.9e} ({})".format(
norm_factor, paramset['norm_convention']
),
"Normalisation factor alternatives: "
"{:.9e} (particle), {:.9e} (mesh), {:.9e} (mesh-mixed)".format(
norm_factor_part, norm_factor_mesh, norm_factor_meshes
),
", ".join([
"[{:d}] {}".format(colidx, colname)
for colidx, colname in enumerate(datatab_colnames)
])
]
text_header = "\n".join(text_lines)
return text_header
def _assemble_measurement_datatab(measurements, paramset):
"""Assemble measurement data table.
Parameters
----------
measurements : dict
Measurement results.
paramset : :class:`~triumvirate.parameters.ParameterSet`
Parameter set.
Returns
-------
datatab : :class:`numpy.ndarray`
Column-major measurement data table.
Raises
------
ValueError
When `paramset` indicates the measurements are not
two-point statistics.
"""
if paramset['npoint'] != '2pt':
raise ValueError(
"Measurement header being printed is for two-point statistics."
)
if paramset['space'] == 'fourier':
datatab = np.transpose([
measurements['kbin'], measurements['keff'], measurements['nmodes'],
measurements['pk_raw'].real, measurements['pk_raw'].imag,
measurements['pk_shot'].real, measurements['pk_shot'].imag,
])
if paramset['space'] == 'config':
datatab = np.transpose([
measurements['rbin'], measurements['reff'], measurements['npairs'],
measurements['xi'].real, measurements['xi'].imag,
])
return datatab
# ========================================================================
# Survey statistics
# ========================================================================
def _compute_2pt_stats_survey_like(twopt_algofunc,
catalogue_data, catalogue_rand,
los_data=None, los_rand=None,
paramset=None, params_sampling=None,
degree=None, binning=None, types=None,
save=False, logger=None):
"""Compute two-point statistics from survey-like data and random
catalogues in the local plane-parallel approximation.
Parameters
----------
twopt_algofunc : callable
Two-point statistic algorithmic function.
catalogue_data : :class:`~triumvirate.catalogue.ParticleCatalogue`
Data-source catalogue.
catalogue_rand : :class:`~triumvirate.catalogue.ParticleCatalogue`
Random-source catalogue.
los_data : (N, 3) array of float, optional
Specified lines of sight for the data-source catalogue.
If `None` (default), this is automatically computed using
:meth:`~triumvirate.catalogue.ParticleCatalogue.compute_los`.
los_rand : (N, 3) array of float, optional
Specified lines of sight for the random-source catalogue.
If `None` (default), this is automatically computed using
:meth:`~triumvirate.catalogue.ParticleCatalogue.compute_los`.
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Full parameter set. If `None` (default), `degree`, `binning`
and `params_sampling` must be provided.
params_sampling : dict, optional
Dictionary containing a subset of the following entries
for sampling parameters---
- 'alignment': {'centre', 'pad'};
- 'boxsize': sequence of [float, float, float];
- 'ngrid': sequence of [int, int, int];
- 'assignment': {'ngp', 'cic', 'tsc', 'pcs'};
- 'interlace': bool;
and exactly one of the following only when 'alignment' is 'pad'---
- 'boxpad': float;
- 'gridpad': float.
If not `None` (default), this will override the corresponding
entries in `paramset`.
degree : int, optional
Multipole degree. If not `None` (default), this will override
``paramset['degrees']['ELL']``.
binning : :class:`~triumvirate.dataobjs.Binning`, optional
Binning for the measurements. If `None` (default), this is
constructed from `paramset`.
types : dict, optional
'catalogue_type' and 'statistic_type' values (default is `None`).
This should be set by the caller of this function.
save : {'.txt', '.npz', False}, optional
If not `False` (default), save the measurements as a '.txt' file
or in '.npz' format. The save path is determined from `paramset`
(if unset, a default file path in the current working directory is
used).
logger : :class:`logging.Logger`, optional
Logger (default is `None`).
Returns
-------
results : dict of {str: :class:`numpy.ndarray`}
Measurement results.
Raises
------
ValueError
When `paramset` is `None` while `params_sampling`, `degree` or
`binning` is also `None`.
"""
# --------------------------------------------------------------------
# Initialisation
# --------------------------------------------------------------------
# -- Parameters ------------------------------------------------------
paramset = _amalgamate_parameters(
paramset=paramset, params_sampling=params_sampling,
degree=degree, binning=binning, **types
)
if isinstance(paramset, dict): # likely redundant but safe
paramset = ParameterSet(param_dict=paramset, logger=logger)
if logger:
logger.info("Parameter set have been initialised.")
if paramset['catalogue_type'] != 'survey':
raise ValueError(
"`paramset` 'catalogue_type' does not correspond to "
"the local plane-parallel clustering algorithm being called: "
f"catalogue_type = '{paramset['catalogue_type']}'."
)
if paramset['statistic_type'] == 'powspec':
statistic_name = 'power spectrum'
elif paramset['statistic_type'] == '2pcf':
statistic_name = 'two-point correlation function'
else:
raise ValueError(
"`paramset` 'statistic_type' does not correspond to "
"the clustering algorithm being called: "
f"statistic_type = '{paramset['statistic_type']}'."
)
# -- Data ------------------------------------------------------------
# Set up binning.
if binning is None:
binning = Binning.from_parameter_set(paramset)
if logger:
logger.info("Binning has been initialised.")
# Set up lines of sight.
if los_data is None:
los_data = catalogue_data.compute_los()
if los_rand is None:
los_rand = catalogue_rand.compute_los()
los_data = np.ascontiguousarray(los_data)
los_rand = np.ascontiguousarray(los_rand)
if logger:
logger.info("Lines of sight have been initialised.")
# Set up box alignment.
if paramset['alignment'] == 'centre':
catalogue_data.centre(
[paramset['boxsize'][ax] for ax in ['x', 'y', 'z']],
catalogue_ref=catalogue_rand
)
if paramset['alignment'] == 'pad':
if paramset['padscale'] == 'box':
kwargs = {'boxsize_pad': paramset['padfactor']}
if paramset['padscale'] == 'grid':
kwargs = {
'ngrid': [paramset['ngrid'][ax] for ax in ['x', 'y', 'z']],
'ngrid_pad': paramset['padfactor']
}
catalogue_data.pad(
[paramset['boxsize'][ax] for ax in ['x', 'y', 'z']],
catalogue_ref=catalogue_rand, **kwargs
)
if logger:
logger.info("Catalogues have been aligned.")
# --------------------------------------------------------------------
# Measurements
# --------------------------------------------------------------------
# Prepare catalogues.
if logger:
logger.info(
"Preparing catalogue for clustering algorithm...",
cpp_state='start'
)
particles_data = \
catalogue_data._convert_to_cpp_catalogue(verbose=paramset['verbose'])
particles_rand = \
catalogue_rand._convert_to_cpp_catalogue(verbose=paramset['verbose'])
if logger:
logger.info(
"... prepared catalogue for clustering algorithm.",
cpp_state='end'
)
# Set up constants.
alpha = catalogue_data.wstotal / catalogue_rand.wstotal
if logger:
logger.info("Alpha contrast: %.6e.", alpha)
norm_factor_part = _calc_powspec_normalisation_from_particles(
particles_rand, alpha
)
norm_factor_mesh = _calc_powspec_normalisation_from_mesh(
particles_rand, paramset, alpha
)
norm_factor_meshes = _calc_powspec_normalisation_from_meshes(
particles_data, particles_rand, paramset, alpha,
padding=PADDING, cellsize=CELLSIZE, assignment=ASSIGNMENT
)
if paramset['norm_convention'] == 'none':
norm_factor = 1.
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle), %.6e (mesh), %.6e (mesh-mixed) (none used)."
)
if paramset['norm_convention'] == 'particle':
norm_factor = norm_factor_part
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle; used), %.6e (mesh), %.6e (mesh-mixed)."
)
if paramset['norm_convention'] == 'mesh':
norm_factor = norm_factor_mesh
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle), %.6e (mesh; used), %.6e (mesh-mixed)."
)
if paramset['norm_convention'] == 'mesh-mixed':
norm_factor = norm_factor_meshes
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle), %.6e (mesh), %.6e (mesh-mixed; used)."
)
if logger:
logger.info(
norm_log_mesg,
norm_factor_part, norm_factor_mesh, norm_factor_meshes
)
# Perform measurement.
if logger:
logger.info(
"Measuring %s from paired survey-type catalogues "
"in the local plane-parallel approximation...",
statistic_name,
cpp_state='start'
)
results = twopt_algofunc(
particles_data, particles_rand, los_data, los_rand,
paramset, binning, norm_factor
)
if logger:
logger.info(
"... measured %s from paired survey-type catalogues "
"in the local plane-parallel approximation.",
statistic_name,
cpp_state='end'
)
if save:
odirpath = paramset['directories']['measurements'] or ""
header = "\n".join([
catalogue_data.write_attrs_as_header(catalogue_ref=catalogue_rand),
_print_measurement_header(
paramset,
norm_factor_part, norm_factor_mesh, norm_factor_meshes
),
])
if save.lower() == '.txt':
datatab = _assemble_measurement_datatab(results, paramset)
datafmt = '\t'.join(
['%.9e'] * 2 + ['%10d'] + ['% .9e'] * (datatab.shape[-1] - 3)
)
ofilename = _get_measurement_filename(paramset)
ofilepath = Path(odirpath, ofilename).with_suffix('.txt')
np.savetxt(
ofilepath, datatab, fmt=datafmt, header=header, delimiter='\t'
)
elif save.lower().endswith('.npz'):
results.update({'header': header})
ofilename = _get_measurement_filename(paramset)
ofilepath = Path(odirpath, ofilename).with_suffix('.npz')
np.savez(ofilepath, **results)
else:
raise ValueError(
f"Unrecognised save format for measurements: {save}."
)
if logger:
logger.info("Measurements saved to %s.", ofilepath)
return results
[docs]
def compute_powspec(catalogue_data, catalogue_rand,
los_data=None, los_rand=None,
degree=None, binning=None, sampling_params=None,
paramset=None,
save=False, logger=None):
"""Compute power spectrum from survey-like data and random catalogues
in the local plane-parallel approximation.
Parameters
----------
catalogue_data : :class:`~triumvirate.catalogue.ParticleCatalogue`
Data-source catalogue.
catalogue_rand : :class:`~triumvirate.catalogue.ParticleCatalogue`
Random-source catalogue.
los_data : (N, 3) array of float, optional
Specified lines of sight for the data-source catalogue.
If `None` (default), this is automatically computed using
:meth:`~triumvirate.catalogue.ParticleCatalogue.compute_los`.
los_rand : (N, 3) array of float, optional
Specified lines of sight for the random-source catalogue.
If `None` (default), this is automatically computed using
:meth:`~triumvirate.catalogue.ParticleCatalogue.compute_los`.
degree : int, optional
Multipole degree. If not `None` (default), this will override
``paramset['degrees']['ELL']``.
binning : :class:`~triumvirate.dataobjs.Binning`, optional
Binning for the measurements. If `None` (default), this is
constructed from `paramset`.
sampling_params : dict, optional
Dictionary containing a subset of the following entries
for sampling parameters---
- 'alignment': {'centre', 'pad'};
- 'boxsize': sequence of [float, float, float];
- 'ngrid': sequence of [int, int, int];
- 'assignment': {'ngp', 'cic', 'tsc', 'pcs'};
- 'interlace': bool;
and exactly one of the following only when 'alignment' is 'pad'---
- 'boxpad': float;
- 'gridpad': float.
If not `None` (default), this will override the corresponding
entries in `paramset`.
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Full parameter set (default is `None`). This is used in lieu of
`degree`, `binning` or `sampling_params`.
save : {'.txt', '.npz', False}, optional
If not `False` (default), save the measurements as a '.txt' file
or in '.npz' format. The save path is determined from `paramset`
(if unset, a default file path in the current working directory is
used).
logger : :class:`logging.Logger`, optional
Logger (default is `None`).
Returns
-------
results : dict of {str: :class:`numpy.ndarray`}
Measurement results as a dictionary with the following entries---
- 'kbin': central wavenumber for each bin;
- 'keff': effective wavenumber for each bin;
- 'nmodes': number of wavevector modes in each bin;
- 'pk_raw': power spectrum raw measurements including any
specified normalisation and shot noise;
- 'pk_shot': power spectrum shot noise.
The effective wavenumber is here defined as the average wavenumber
in each bin.
Raises
------
ValueError
When `paramset` is `None` but `degree`, `binning` or
`sampling_params` is also `None`.
Examples
--------
Specify line-of-sight vectors explicitly.
>>> results = compute_powspec(
... catalogue_data, catalogue_rand,
... los_data=np.ones((1e3, 3)), los_rand=np.ones((1e4, 3)),
... paramset=None
... )
Specify multipole `degree` 2 and provide customised
:class:`~triumvirate.dataobjs.Binning` object ``binning``.
Whether `paramset` provided or not, relevant parameters are overridden
by the supplied keyword arguments.
>>> results = compute_powspec(
... catalogue_data, catalogue_rand,
... degree=2,
... binning=binning,
... sampling_params={
... 'boxsize': [1000., 1500., 1000.],
... 'ngrid': [256, 256, 256],
... # 'alignment' at default initial value in `ParameterSet`
... # 'assignment' at default initial value in `ParameterSet`
... }
... )
"""
results = _compute_2pt_stats_survey_like(
_compute_powspec,
catalogue_data, catalogue_rand,
los_data=los_data, los_rand=los_rand,
paramset=paramset, params_sampling=sampling_params,
degree=degree, binning=binning,
types={'catalogue_type': 'survey', 'statistic_type': 'powspec'},
save=save, logger=logger
)
return results
[docs]
def compute_corrfunc(catalogue_data, catalogue_rand,
los_data=None, los_rand=None,
degree=None, binning=None, sampling_params=None,
paramset=None,
save=False, logger=None):
"""Compute correlation function from survey-like data and random
catalogues in the local plane-parallel approximation.
Parameters
----------
catalogue_data : :class:`~triumvirate.catalogue.ParticleCatalogue`
Data-source catalogue.
catalogue_rand : :class:`~triumvirate.catalogue.ParticleCatalogue`
Random-source catalogue.
los_data : (N, 3) array of float, optional
Specified lines of sight for the data-source catalogue.
If `None` (default), this is automatically computed using
:meth:`~triumvirate.catalogue.ParticleCatalogue.compute_los`.
los_rand : (N, 3) array of float, optional
Specified lines of sight for the random-source catalogue.
If `None` (default), this is automatically computed using
:meth:`~triumvirate.catalogue.ParticleCatalogue.compute_los`.
degree : int, optional
Multipole degree. If not `None` (default), this will override
``paramset['degrees']['ELL']``.
binning : :class:`~triumvirate.dataobjs.Binning`, optional
Binning for the measurements. If `None` (default), this is
constructed from `paramset`.
sampling_params : dict, optional
Dictionary containing a subset of the following entries
for sampling parameters---
- 'alignment': {'centre', 'pad'};
- 'boxsize': sequence of [float, float, float];
- 'ngrid': sequence of [int, int, int];
- 'assignment': {'ngp', 'cic', 'tsc', 'pcs'};
- 'interlace': bool;
and exactly one of the following only when 'alignment' is 'pad'---
- 'boxpad': float;
- 'gridpad': float.
If not `None` (default), this will override the corresponding
entries in `paramset`.
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Full parameter set (default is `None`). This is used
in lieu of `degree`, `binning` or `sampling_params`.
save : {'.txt', '.npz', False}, optional
If not `False` (default), save the measurements as a '.txt' file
or in '.npz' format. The save path is determined from `paramset`
(if unset, a default file path in the current working directory is
used).
logger : :class:`logging.Logger`, optional
Logger (default is `None`).
Returns
-------
results : dict of {str: :class:`numpy.ndarray`}
Measurement results as a dictionary with the following entries---
- 'rbin': central separation for each bin;
- 'reff': effective separation for each bin;
- 'npairs': number of separation pairs in each bin;
- 'xi': two-point correlation function measurements.
The effective separation is here defined as the average separation
in each bin.
Raises
------
ValueError
When `paramset` is `None` but `degree`, `binning` or
`sampling_params` is also `None`.
Examples
--------
See analogous examples in :func:`~triumvirate.twopt.compute_powspec`.
"""
results = _compute_2pt_stats_survey_like(
_compute_corrfunc,
catalogue_data, catalogue_rand,
los_data=los_data, los_rand=los_rand,
paramset=paramset, params_sampling=sampling_params,
degree=degree, binning=binning,
types={'catalogue_type': 'survey', 'statistic_type': '2pcf'},
save=save, logger=logger
)
return results
# ========================================================================
# Simulation statistics
# ========================================================================
def _compute_2pt_stats_sim_like(twopt_algofunc, catalogue_data,
paramset=None, params_sampling=None,
degree=None, binning=None, types=None,
save=False, logger=None):
"""Compute two-point statistics from a simulation-box catalogue in
the global plane-parallel approximation.
Parameters
----------
twopt_algofunc : callable
Two-point statistic algorithmic function.
catalogue_data : :class:`~triumvirate.catalogue.ParticleCatalogue`
Data-source catalogue.
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Full parameter set. If `None` (default), `degree`, `binning`
and `sampling_params` must be provided.
params_sampling : dict, optional
Dictionary containing a subset of the following entries
for sampling parameters---
- 'alignment': {'centre', 'pad'};
- 'boxsize': sequence of [float, float, float];
- 'ngrid': sequence of [int, int, int];
- 'assignment': {'ngp', 'cic', 'tsc', 'pcs'};
- 'interlace': bool;
and exactly one of the following only when 'alignment' is 'pad'---
- 'boxpad': float;
- 'gridpad': float.
If not `None` (default), this will override the corresponding
entries in `paramset`.
degree : int, optional
Multipole degree. If not `None` (default), this will override
``paramset['degrees']['ELL']``.
binning : :class:`~triumvirate.dataobjs.Binning`, optional
Binning for the measurements. If `None` (default), this is
constructed from `paramset`.
types : dict, optional
'catalogue_type' and 'statistic_type' values (default is `None`).
This should be set by the caller of this function.
save : {'.txt', '.npz', False}, optional
If not `False` (default), save the measurements as a '.txt' file
or in '.npz' format. The save path is determined from `paramset`
(if unset, a default file path in the current working directory is
used).
logger : :class:`logging.Logger`, optional
Logger (default is `None`).
Returns
-------
results : dict of {str: :class:`numpy.ndarray`}
Measurement results.
Raises
------
ValueError
When `paramset` is `None` while `params_sampling`, `degree` or
`binning` is also `None`.
"""
# --------------------------------------------------------------------
# Initialisation
# --------------------------------------------------------------------
# -- Parameters ------------------------------------------------------
paramset = _amalgamate_parameters(
paramset=paramset, params_sampling=params_sampling,
degree=degree, binning=binning, **types
)
if isinstance(paramset, dict): # likely redundant but safe
paramset = ParameterSet(param_dict=paramset, logger=logger)
if logger:
logger.info("Parameter set have been initialised.")
if paramset['catalogue_type'] != 'sim':
raise ValueError(
"`paramset` 'catalogue_type' does not correspond to "
"the global plane-parallel clustering algorithm being called: "
f"catalogue_type = '{paramset['catalogue_type']}'."
)
if paramset['statistic_type'] == 'powspec':
statistic_name = 'power spectrum'
elif paramset['statistic_type'] == '2pcf':
statistic_name = 'two-point correlation function'
else:
raise ValueError(
"`paramset` 'statistic_type' does not correspond to "
"the clustering algorithm being called: "
f"statistic_type = '{paramset['statistic_type']}'."
)
# -- Data ------------------------------------------------------------
# Set up binning.
if binning is None:
binning = Binning.from_parameter_set(paramset)
if logger:
logger.info("Binning has been initialised.")
# Set up box alignment.
catalogue_data.periodise(
[paramset['boxsize'][ax] for ax in ['x', 'y', 'z']]
)
if logger:
logger.info("Catalogue box has been periodised.")
# --------------------------------------------------------------------
# Measurements
# --------------------------------------------------------------------
# Prepare catalogues.
if not catalogue_data['nz'].any():
catalogue_data.compute_mean_density(
boxsize=list(paramset['boxsize'].values())
)
if logger:
logger.info(
"Inserted missing 'nz' field "
"based on particle count and box size."
)
if logger:
logger.info(
"Preparing catalogue for clustering algorithm...",
cpp_state='start'
)
particles_data = \
catalogue_data._convert_to_cpp_catalogue(verbose=paramset['verbose'])
if logger:
logger.info(
"... prepared catalogue for clustering algorithm.",
cpp_state='end'
)
# Set up constants.
norm_factor_part = _calc_powspec_normalisation_from_particles(
particles_data, alpha=1.
)
norm_factor_mesh = _calc_powspec_normalisation_from_mesh(
particles_data, paramset, alpha=1.
)
norm_factor_meshes = 0.
if paramset['norm_convention'] == 'none':
norm_factor = 1.
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle), %.6e (mesh), %.6e (mesh-mixed; n/a) (none used)."
)
if paramset['norm_convention'] == 'particle':
norm_factor = norm_factor_part
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle; used), %.6e (mesh), %.6e (mesh-mixed; n/a)."
)
if paramset['norm_convention'] == 'mesh':
norm_factor = norm_factor_mesh
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle), %.6e (mesh; used), %.6e (mesh-mixed; n/a)."
)
if logger:
logger.info(
norm_log_mesg,
norm_factor_part, norm_factor_mesh, norm_factor_meshes
)
# Perform measurement.
if logger:
logger.info(
"Measuring %s from a simulation-box catalogue "
"in the global plane-parallel approximation...",
statistic_name,
cpp_state='start'
)
results = twopt_algofunc(particles_data, paramset, binning, norm_factor)
if logger:
logger.info(
"... measured %s from a simulation-box catalogue "
"in the global plane-parallel approximation.",
statistic_name,
cpp_state='end'
)
if save:
odirpath = paramset['directories']['measurements'] or ""
header = "\n".join([
catalogue_data.write_attrs_as_header(),
_print_measurement_header(
paramset,
norm_factor_part, norm_factor_mesh, norm_factor_meshes
),
])
if save.lower() == '.txt':
datatab = _assemble_measurement_datatab(results, paramset)
datafmt = '\t'.join(
['%.9e'] * 2 + ['%10d'] + ['% .9e'] * (datatab.shape[-1] - 3)
)
ofilename = _get_measurement_filename(paramset)
ofilepath = Path(odirpath, ofilename).with_suffix('.txt')
np.savetxt(
ofilepath, datatab, fmt=datafmt, header=header, delimiter='\t'
)
elif save.lower().endswith('.npz'):
results.update({'header': header})
ofilename = _get_measurement_filename(paramset)
ofilepath = Path(odirpath, ofilename).with_suffix('.npz')
np.savez(ofilepath, **results)
else:
raise ValueError(
f"Unrecognised save format for measurements: {save}."
)
if logger:
logger.info("Measurements saved to %s.", ofilepath)
return results
[docs]
def compute_powspec_in_gpp_box(catalogue_data,
degree=None, binning=None, sampling_params=None,
paramset=None,
save=False, logger=None):
"""Compute power spectrum from a simulation-box catalogue in the
global plane-parallel approximation.
Parameters
----------
catalogue_data : :class:`~triumvirate.catalogue.ParticleCatalogue`
Data-source catalogue.
degree : int, optional
Multipole degree. If not `None` (default), this will override
``paramset['degrees']['ELL']``.
binning : :class:`~triumvirate.dataobjs.Binning`, optional
Binning for the measurements. If `None` (default), this is
constructed from `paramset`.
sampling_params : dict, optional
Dictionary containing a subset of the following entries
for sampling parameters---
- 'alignment': {'centre', 'pad'};
- 'boxsize': sequence of [float, float, float];
- 'ngrid': sequence of [int, int, int];
- 'assignment': {'ngp', 'cic', 'tsc', 'pcs'};
- 'interlace': bool;
and exactly one of the following only when 'alignment' is 'pad'---
- 'boxpad': float;
- 'gridpad': float.
If not `None` (default), this will override the corresponding
entries in `paramset`.
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Full parameter set (default is `None`). This is used
in lieu of `degree`, `binning` or `sampling_params`.
save : {'.txt', '.npz', False}, optional
If not `False` (default), save the measurements as a '.txt' file
or in '.npz' format. The save path is determined from `paramset`
(if unset, a default file path in the current working directory is
used).
logger : :class:`logging.Logger`, optional
Logger (default is `None`).
Returns
-------
results : dict of {str: :class:`numpy.ndarray`}
Measurement results as a dictionary with the following entries---
- 'kbin': central wavenumber for each bin;
- 'keff': effective wavenumber for each bin;
- 'nmodes': number of wavevector modes in each bin;
- 'pk_raw': power spectrum raw measurements including any
specified normalisation and shot noise;
- 'pk_shot': power spectrum shot noise.
The effective wavenumber is here defined as the average wavenumber
in each bin.
Raises
------
ValueError
When `paramset` is `None` but `degree`, `binning` or
`sampling_params` is also `None`.
Examples
--------
See analogous examples in :func:`~triumvirate.twopt.compute_powspec`
(though without the line-of-sight arguments).
"""
results = _compute_2pt_stats_sim_like(
_compute_powspec_in_gpp_box, catalogue_data,
paramset=paramset, params_sampling=sampling_params,
degree=degree, binning=binning,
types={'catalogue_type': 'sim', 'statistic_type': 'powspec'},
save=save, logger=logger
)
return results
[docs]
def compute_corrfunc_in_gpp_box(catalogue_data,
degree=None, binning=None,
sampling_params=None,
paramset=None,
save=False, logger=None):
"""Compute correlation function from a simulation-box catalogue in
the global plane-parallel approximation.
Parameters
----------
catalogue_data : :class:`~triumvirate.catalogue.ParticleCatalogue`
Data-source catalogue.
degree : int, optional
Multipole degree. If not `None` (default), this will override
``paramset['degrees']['ELL']``.
binning : :class:`~triumvirate.dataobjs.Binning`, optional
Binning for the measurements. If `None` (default), this is
constructed from `paramset`.
sampling_params : dict, optional
Dictionary containing a subset of the following entries
for sampling parameters---
- 'alignment': {'centre', 'pad'};
- 'boxsize': sequence of [float, float, float];
- 'ngrid': sequence of [int, int, int];
- 'assignment': {'ngp', 'cic', 'tsc', 'pcs'};
- 'interlace': bool;
and exactly one of the following only when 'alignment' is 'pad'---
- 'boxpad': float;
- 'gridpad': float.
If not `None` (default), this will override the corresponding
entries in `paramset`.
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Full parameter set (default is `None`). This is used
in lieu of `degree`, `binning` or `sampling_params`.
save : {'.txt', '.npz', False}, optional
If not `False` (default), save the measurements as a '.txt' file
or in '.npz' format. The save path is determined from `paramset`
(if unset, a default file path in the current working directory is
used).
logger : :class:`logging.Logger`, optional
Logger (default is `None`).
Returns
-------
results : dict of {str: :class:`numpy.ndarray`}
Measurement results as a dictionary with the following entries---
- 'rbin': central separation for each bin;
- 'reff': effective separation for each bin;
- 'npairs': number of separation pairs in each bin;
- 'xi': two-point correlation function measurements.
The effective separation is here defined as the average separation
in each bin.
Raises
------
ValueError
When `paramset` is `None` but `degree`, `binning` or
`sampling_params` is also `None`.
Examples
--------
See analogous examples in :func:`~triumvirate.twopt.compute_powspec`
(though without the line-of-sight arguments).
"""
results = _compute_2pt_stats_sim_like(
_compute_corrfunc_in_gpp_box, catalogue_data,
paramset=paramset, params_sampling=sampling_params,
degree=degree, binning=binning,
types={'catalogue_type': 'sim', 'statistic_type': '2pcf'},
save=save, logger=logger
)
return results
# ========================================================================
# Window statistics
# ========================================================================
[docs]
def compute_corrfunc_window(catalogue_rand, los_rand=None,
degree=None, binning=None, sampling_params=None,
paramset=None,
save=False, logger=None):
"""Compute correlation function window from a random catalogue.
Parameters
----------
catalogue_rand : :class:`~triumvirate.catalogue.ParticleCatalogue`
Random-source catalogue.
los_rand : (N, 3) array of float, optional
Specified lines of sight for the random-source catalogue.
If `None` (default), this is automatically computed using
:meth:`~triumvirate.catalogue.ParticleCatalogue.compute_los`.
degree : int, optional
Multipole degree. If not `None` (default), this will override
``paramset['degrees']['ELL']``.
binning : :class:`~triumvirate.dataobjs.Binning`, optional
Binning for the measurements. If `None` (default), this is
constructed from `paramset`.
sampling_params : dict, optional
Dictionary containing a subset of the following entries
for sampling parameters---
- 'alignment': {'centre', 'pad'};
- 'boxsize': sequence of [float, float, float];
- 'ngrid': sequence of [int, int, int];
- 'assignment': {'ngp', 'cic', 'tsc', 'pcs'};
- 'interlace': bool;
and exactly one of the following only when 'alignment' is 'pad'---
- 'boxpad': float;
- 'gridpad': float.
If not `None` (default), this will override the corresponding
entries in `paramset`.
paramset : :class:`~triumvirate.parameters.ParameterSet`, optional
Full parameter set (default is `None`). This is used
in lieu of `degree`, `binning` or `sampling_params`.
save : {'.txt', '.npz', False}, optional
If not `False` (default), save the measurements as a '.txt' file
or in '.npz' format. The save path is determined from `paramset`
(if unset, a default file path in the current working directory is
used).
logger : :class:`logging.Logger`, optional
Logger (default is `None`).
Returns
-------
results : dict of {str: :class:`numpy.ndarray`}
Measurement results as a dictionary with the following entries---
- 'rbin': central separation for each bin;
- 'reff': effective separation for each bin;
- 'npairs': number of separation pairs in each bin;
- 'xi': two-point correlation function window measurements.
The effective separation is here defined as the average separation
in each bin.
Raises
------
ValueError
When `paramset` is `None` but `degree`, `binning` or
`sampling_params` is also `None`.
Examples
--------
See analogous examples in :func:`~triumvirate.twopt.compute_powspec`.
"""
# --------------------------------------------------------------------
# Initialisation
# --------------------------------------------------------------------
# -- Parameters ------------------------------------------------------
paramset = _amalgamate_parameters(
paramset=paramset, params_sampling=sampling_params,
degree=degree, binning=binning,
catalogue_type='random', statistic_type='2pcf-win'
)
if isinstance(paramset, dict): # likely redundant but safe
paramset = ParameterSet(param_dict=paramset, logger=logger)
if logger:
logger.info("Parameter set have been initialised.")
# -- Data ------------------------------------------------------------
# Set up binning.
if binning is None:
binning = Binning.from_parameter_set(paramset)
if logger:
logger.info("Binning has been initialised.")
# Set up lines of sight.
if los_rand is None:
los_rand = catalogue_rand.compute_los()
los_rand = np.ascontiguousarray(los_rand)
if logger:
logger.info("Lines of sight have been initialised.")
# Set up box alignment.
catalogue_rand.centre(
[paramset['boxsize'][ax] for ax in ['x', 'y', 'z']]
)
catalogue_rand.periodise(
[paramset['boxsize'][ax] for ax in ['x', 'y', 'z']]
)
if logger:
logger.info("Catalogues have been aligned.")
# --------------------------------------------------------------------
# Measurements
# --------------------------------------------------------------------
# Prepare catalogues.
particles_rand = \
catalogue_rand._convert_to_cpp_catalogue(verbose=paramset['verbose'])
# Set up constants.
norm_factor_part = _calc_powspec_normalisation_from_particles(
particles_rand, alpha=1.
)
norm_factor_mesh = _calc_powspec_normalisation_from_mesh(
particles_rand, paramset, alpha=1.
)
norm_factor_meshes = 0.
if paramset['norm_convention'] == 'none':
norm_factor = 1.
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle), %.6e (mesh), %.6e (mesh-mixed; n/a) (none used)."
)
if paramset['norm_convention'] == 'particle':
norm_factor = norm_factor_part
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle; used), %.6e (mesh), %.6e (mesh-mixed; n/a)."
)
if paramset['norm_convention'] == 'mesh':
norm_factor = norm_factor_mesh
norm_log_mesg = (
"Normalisation factors: "
"%.6e (particle), %.6e (mesh; used), %.6e (mesh-mixed; n/a)."
)
if logger:
logger.info(
norm_log_mesg,
norm_factor_part, norm_factor_mesh, norm_factor_meshes
)
# Perform measurement.
if logger:
logger.info(
"Measuring two-point correlation function window "
"from a random catalogue...",
cpp_state='start'
)
results = _compute_corrfunc_window(
particles_rand, los_rand, paramset, binning,
alpha=1., norm_factor=norm_factor
)
if logger:
logger.info(
"... measured two-point correlation function window "
"from a random catalogue.",
cpp_state='end'
)
if save:
odirpath = paramset['directories']['measurements']
header = "\n".join([
catalogue_rand.write_attrs_as_header(),
_print_measurement_header(
paramset,
norm_factor_part, norm_factor_mesh, norm_factor_meshes
),
])
if save.lower() == '.txt':
datatab = _assemble_measurement_datatab(results, paramset)
datafmt = '\t'.join(
['%.9e'] * 2 + ['%10d'] + ['% .9e'] * (datatab.shape[-1] - 3)
)
ofilename = _get_measurement_filename(paramset)
ofilepath = Path(odirpath, ofilename).with_suffix('.txt')
np.savetxt(
ofilepath, datatab, fmt=datafmt, header=header, delimiter='\t'
)
elif save.lower().endswith('.npz'):
results.update({'header': header})
ofilename = _get_measurement_filename(paramset)
ofilepath = Path(odirpath, ofilename).with_suffix('.npz')
np.savez(ofilepath, **results)
else:
raise ValueError(
f"Unrecognised save format for measurements: {save}."
)
if logger:
logger.info("Measurements saved to %s.", ofilepath)
return results