Source code for triumvirate.transforms

"""
Hankel-Like Transforms (:mod:`~triumvirate.transforms`)
=======================================================

.. versionadded:: 0.4.6


Perform Hankel-like transforms in 1- and 2-d.

.. autosummary::
    SphericalBesselTransform
    DoubleSphericalBesselTransform
    resample_lglin
    resample_lin

"""  # numpydoc ignore=SS01
from collections.abc import Sequence

import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline, RectBivariateSpline

from ._arrayops import (
    extrap_lin,
    extrap_loglin,
    extrap_loglin_oscil,
    extrap_pad,
    extrap2d_lin,
    extrap2d_loglin,
    extrap2d_loglin_oscil,
    extrap2d_pad,
)
from ._fftlog import HankelTransform


[docs] class SphericalBesselTransform: """Spherical Bessel Transform. Parameters ---------- degree : int Degree of the spherical Bessel transform. bias : int Power-law bias index. sample_pts : array of float Pre-transform sample points in 1-d. Must be logarithmically spaced. Must be even in length if extrapolation is used. pivot : float, optional Pivot value (default is 1.). When `lowring` is `True`, this is adjusted if it is non-zero, or otherwise directly calculated. lowring : bool, optional Low-ringing condition (default is `True`). extrap : int, optional Extrapolation method (default is 0) with the following options: - 0: none; - 1: extrapolate by zero padding; - 2: extrapolate by constant padding; - 3: extrapolate linearly; - 4: extrapolate log-linearly; - 5: extrapolate log-linearly with oscillatory behaviour. Any extrapolation results in a sample size for the transform that is the smallest power of 2 greater than or equal to `extrap_exp` times the original number of sample points; the pre-transform samples are assumed to be real. extrap_exp : float, optional Sample size expansion factor (default is 2.) for extrapolation. The smallest power of 2 greater than or equal to this times the original number of sample points is used as the sample size for the transform. extrap_layer : {'native', 'outer'}, optional Extrapolation layer: - 'native': extrapolate the samples, including any pre-factors, within the underlying FFTLog algorithm; - 'outer' (default): extrapolate the samples, excluding any pre-factors, before passing them to the underlying FFTLog algorithm. threaded : bool, optional If `True` (default is `False`), use the multi-threaded FFTLog algorithm. Attributes ---------- degree : int Degree of the spherical Bessel transform. bias : int Power-law bias index. size : int Sample size. pivot : float Pivot value used. Raises ------ ValueError When `extrap` is not a supported option. ValueError When the input sample size is not even and extrapolation is used. """ def __init__(self, degree, bias, sample_pts, pivot=1., lowring=True, extrap=0, extrap_exp=2., extrap_layer='outer', threaded=False): if extrap not in {0, 1, 2, 3, 4, 5}: raise ValueError( f"Extrapolation option must be in 0 to 5: {extrap=}." ) self.degree = degree self.bias = bias self._lowring = lowring self._extrap = extrap self._extrap_layer = extrap_layer if self._extrap: if (nsamp := len(sample_pts)) % 2: raise ValueError( "Input sample size must be even when extrapolation " f"is used: {len(sample_pts)=}." ) if self._extrap_layer == 'native': extrap_native = self._extrap self._n_ext = 0 elif self._extrap_layer == 'outer': extrap_native = 0 nsamp_trans = int(2 ** np.ceil(np.log2(extrap_exp * nsamp))) self._n_ext = (nsamp_trans - nsamp) // 2 sample_pts = extrap_loglin(sample_pts, self._n_ext) else: raise ValueError( "Extrapolation layer must be 'native' or 'outer': " f"extrap_layer={self._extrap_layer}." ) else: self._n_ext = 0 extrap_native = 0 self._fbht = HankelTransform( degree + 1./2, bias, sample_pts, kr_c=pivot, lowring=lowring, extrap=extrap_native, extrap_exp=extrap_exp, threaded=threaded ) self._post_slice = slice(self._n_ext, -self._n_ext) if self._n_ext \ else slice(None) self._logres = self._fbht._logres self._pre_sampts = np.asarray(self._fbht._pre_sampts) self._post_sampts = np.asarray(self._fbht._post_sampts)[ self._post_slice ] @property def size(self): """Sample size of the transform. Returns ------- int """ # numpydoc ignore=RT03 return self._fbht.size @property def pivot(self): """Pivot value. Returns ------- float """ # numpydoc ignore=RT03 return self._fbht._pivot
[docs] def transform(self, pre_samples): """Transform samples at initialised sample points. Parameters ---------- pre_samples : array_like Pre-transform samples in 1-d. Assumed to be real if extrapolation is used. Returns ------- post_sampts, post_samples : array_like Post-transform sample points and samples. """ if self._extrap_layer == 'outer': pre_samples = self._perform_extrap(pre_samples) pre_samples *= self._pre_sampts ** (3./2) post_sampts, post_samples = self._fbht.transform(pre_samples) post_sampts = post_sampts[self._post_slice] post_samples = post_samples[self._post_slice] post_samples *= (2*np.pi / self._post_sampts) ** (3./2) return post_sampts, post_samples
[docs] def transform_cosmo_multipoles(self, direction, pre_samples): """Transform cosmological multipoles between configuration and Fourier spaces. Parameters ---------- direction : {'forward', 'backward', 1, -1} Transform direction: 'forward' or 1 from configuration to Fourier space, 'backward' or -1 from Fourier to configuration space. pre_samples : array of float Pre-transform samples in 1-d. Assumed to be real if extrapolation is used. Returns ------- post_sampts, post_samples : array of float Post-transform sample points and samples. """ if direction in {'forward', 1}: parity = (-1j) ** self.degree prefactor = 1. elif direction in {'backward', -1}: parity = (1j) ** self.degree prefactor = (2*np.pi) ** (-3) else: raise ValueError( "Transform direction must be 'forward'/1 or 'backward'/-1." ) post_sampts, post_samples = self.transform(pre_samples) post_samples *= prefactor * parity return post_sampts, post_samples
def _perform_extrap(self, arr): """Perform extrapolation according to internal attributes. Parameters ---------- arr : array_like 1-d array to extrapolate. Returns ------- array_like Extrapolated 1-d array. """ if self._extrap == 1: return extrap_pad(arr, self._n_ext, 0., 0.) if self._extrap == 2: return extrap_pad(arr, self._n_ext, arr[0], arr[-1]) if self._extrap == 3: return extrap_lin(arr, self._n_ext) if self._extrap == 4: return extrap_loglin(arr, self._n_ext) if self._extrap == 5: return extrap_loglin_oscil(arr, self._n_ext) return arr
[docs] class DoubleSphericalBesselTransform: """Double spherical Bessel Transform. Parameters ---------- degrees : (int, int) Degrees of the double spherical Bessel transform. biases : (int, int) Power-law bias indices. sample_pts : array of float Pre-transform sample points in 1-d for both dimensions. Must be logarithmically spaced. Must be even in length if extrapolation is used. pivot : float, optional Pivot value (default is 1.). When `lowring` is `True`, this is adjusted if it is non-zero, or otherwise directly calculated. lowring : bool, optional Low-ringing condition (default is `True`). extrap : int or tuple of int, optional Extrapolation method for both or each dimension(s) (default is 0) with the following options: - 0: none; - 1: extrapolate by zero padding; - 2: extrapolate by constant padding; - 3: extrapolate linearly; - 4: extrapolate log-linearly; - 5: extrapolate log-linearly with oscillatory behaviour. Any extrapolation results in a sample size for the transform that is the smallest power of 2 greater than or equal to `extrap_exp` times the original number of sample points; the pre-transform samples are assumed to be real. extrap_exp : float, optional Sample size expansion factor (default is 2.) for extrapolation. The smallest power of 2 greater than or equal to this times the original number of sample points is used as the sample size for the transform. extrap2d : bool, optional If `True` (default is `False`), perform 2-d extrapolation pre-transform excluding any pre-factors; otherwise, perform 1-d extrapolation including any pre-factors within the underlying FFTLog algorithm. threaded : bool, optional If `True` (default is `False`), use the multi-threaded FFTLog algorithm. Attributes ---------- degrees : (int, int) Degrees of the spherical Bessel transform. biases : (int, int) Power-law bias indices. size : int Sample size for either dimension. pivot : (float, float) Pivot values used for both dimensions. Raises ------ ValueError When `extrap` is not a supported option. ValueError When the input sample size is not even and extrapolation is used. """ # Representative transform, either vertical (0, across rows) # or horizontal (1, across columns). _rep = 0 def __init__(self, degrees, biases, sample_pts, pivot=1., lowring=True, extrap=0, extrap_exp=2., extrap2d=False, threaded=False): if extrap not in {0, 1, 2, 3, 4, 5}: raise ValueError( f"Extrapolation option must be in 0 to 5: {extrap=}." ) self.degrees = degrees self.biases = biases self._lowring = lowring self._extrap = extrap self._extrap2d = extrap2d if self._extrap: if (nsamp := len(sample_pts)) % 2: raise ValueError( "Input sample size must be even when extrapolation " f"is used: {len(sample_pts)=}." ) if not self._extrap2d: extrap_native = self._extrap self._n_ext = 0 else: extrap_native = 0 nsamp_trans = int(2 ** np.ceil(np.log2(extrap_exp * nsamp))) self._n_ext = (nsamp_trans - nsamp) // 2 sample_pts = extrap_loglin(sample_pts, self._n_ext) else: self._n_ext = 0 extrap_native = 0 self._fbsjt = tuple( SphericalBesselTransform( degree_, bias_, sample_pts, pivot=pivot, lowring=lowring, extrap=extrap_native, extrap_exp=extrap_exp, threaded=threaded ) for (degree_, bias_) in zip(degrees, biases) ) self._post_slice = slice(self._n_ext, -self._n_ext) if self._n_ext \ else slice(None) self._logres = self._fbsjt[self._rep]._logres self._pre_sampts = np.asarray(self._fbsjt[self._rep]._pre_sampts) self._post_sampts = np.asarray(self._fbsjt[self._rep]._post_sampts)[ self._post_slice ] @property def size(self): """Sample size of the transform for either dimension. Returns ------- int """ # numpydoc ignore=RT03 return self._fbsjt[self._rep].size @property def pivot(self): """Pivot values for both dimensions. Returns ------- tuple of float """ # numpydoc ignore=RT03 return (self._fbsjt[0]._pivot, self._fbsjt[1]._pivot)
[docs] def transform(self, pre_samples): """Transform samples at initialised sample points. Parameters ---------- pre_samples : array_like Pre-transform samples in 2-d. Assumed to be real if extrapolation is used. Returns ------- post_sampts, post_samples : array_like Post-transform sample points and samples in 2-d, with `post_sampts` in :func:`numpy.meshgrid` 'ij'-indexing format. """ if self._extrap2d: pre_samples = self._perform_extrap(pre_samples) inter_samples = [] for pre_samples_row in pre_samples: _, inter_samples_row = self._fbsjt[1].transform(pre_samples_row) inter_samples.append(inter_samples_row) post_samples_T = [] for inter_samples_col in np.asarray(inter_samples).T: _, post_samples_col = self._fbsjt[0].transform(inter_samples_col) post_samples_T.append(post_samples_col) post_samples = np.asarray(post_samples_T).T post_samples = post_samples[self._post_slice, self._post_slice] post_sampts = np.meshgrid( self._post_sampts, self._post_sampts, indexing='ij' ) return post_sampts, post_samples
[docs] def transform_cosmo_multipoles(self, direction, pre_samples): """Transform cosmological multipoles between configuration and Fourier spaces. Parameters ---------- direction : {'forward', 'backward', 1, -1} Transform direction: 'forward' or 1 from configuration to Fourier space, 'backward' or -1 from Fourier to configuration space. pre_samples : array of float Pre-transform samples in 2-d. Assumed to be real if extrapolation is used. Returns ------- post_sampts, post_samples : array of float Post-transform sample points and samples in 2-d, with `post_sampts` in :func:`numpy.meshgrid` 'ij'-indexing format. """ if direction in {'forward', 1}: parity = (-1j) ** sum(self.degrees) prefactor = 1. elif direction in {'backward', -1}: parity = (1j) ** sum(self.degrees) prefactor = (2*np.pi) ** (-3 * len(self.degrees)) else: raise ValueError( "Transform direction must be 'forward'/1 or 'backward'/-1." ) post_sampts, post_samples = self.transform(pre_samples) post_samples *= prefactor * parity return post_sampts, post_samples
def _perform_extrap(self, arr): """Perform extrapolation according to internal attributes. Parameters ---------- arr : array_like 2-d array to extrapolate. Returns ------- array_like Extrapolated 2-d array. """ arr = np.asarray(arr) if self._extrap == 1: return extrap2d_pad( arr, self._n_ext, 0., 0., 0., 0. ) if self._extrap == 2: return extrap2d_pad( arr, self._n_ext, arr[:, 0], arr[:, -1], arr[0, :], arr[-1, :] ) if self._extrap == 3: return extrap2d_lin(arr, self._n_ext) if self._extrap == 4: return extrap2d_loglin(arr, self._n_ext) if self._extrap == 5: return extrap2d_loglin_oscil(arr, self._n_ext) return arr
[docs] def resample_lglin(sampts, samples, size=None, spline=3): """Resample at logarithmically spaced sample points in 1- or 2-d. Parameters ---------- sampts : array_like or sequence of array_like Original sample points (1-d in each dimension). samples : array_like Original samples in 1-d or 2-d. size : int or sequence of int, optional Size of the interpolated samples. If `None` (default), the size is kept the same as the original samples. spline : int, optional Degree of the interpolation spline (default is 3). Returns ------- resampts : (2-tuple of) :class:`numpy.ndarray` Logarithmically spaced sample points (1-d for each dimension) in the original range(s). resamples : :class:`numpy.ndarray` Interpolated samples. Examples -------- Resample a 2-d function: >>> def f(x, y): return np.exp(-x*np.sqrt(y)) >>> sampts = 1.e-3 * np.arange(1., 51.) >>> samples = f(*np.meshgrid(sampts, sampts, indexing='ij')) >>> resampts, resamples = resample_lglin(sampts, samples, size=10) Check that the resampling is accurate: >>> resamples_expected = f( ... *np.meshgrid(resampts[0], resampts[-1], indexing='ij') ... ) >>> print( ... "Resampling is accurate:", ... np.allclose(resamples, resamples_expected) ... ) Resampling is accurate: True """ sampts = np.squeeze(sampts) samples = np.squeeze(samples) if samples.ndim == 1: if sampts.ndim != 1: raise ValueError("Sample points must be 1-d.") resampts = np.logspace( np.log10(sampts[0]), np.log10(sampts[-1]), size or len(sampts), base=10 ) resamples = InterpolatedUnivariateSpline( sampts, samples, k=spline, ext='raise' )(resampts) return resampts, resamples if samples.ndim == 2: if sampts.ndim == 1: sampts = np.array([sampts, sampts]) if len(sampts) != 2 or sampts[0].ndim != 1 or sampts[-1].ndim != 1: raise ValueError( "Must provide (two) 1-d array(s) as sample points " "for 2-d resampling." ) size = size if isinstance(size, Sequence) else (size, size) resampts = ( np.logspace( np.log10(sampts[0][0]), np.log10(sampts[0][-1]), size[0] or len(sampts[0]), base=10 ), np.logspace( np.log10(sampts[-1][0]), np.log10(sampts[-1][-1]), size[-1] or len(sampts[-1]), base=10 ) ) resamples = RectBivariateSpline( sampts[0], sampts[-1], samples, kx=spline, ky=spline )(*resampts) return resampts, resamples raise ValueError(f"Sample points must be 1-d or 2-d: {samples.ndim=}.")
[docs] def resample_lin(sampts, samples, size=None, spline=3): """Resample at linearly spaced sample points in 1- or 2-d. Parameters ---------- sampts : array_like or sequence of array_like Original sample points (1-d in each dimension). samples : array_like Original samples in 1-d or 2-d. size : int or sequence of int, optional Size of the interpolated samples. If `None` (default), the size is kept the same as the original samples. spline : int, optional Degree of the interpolation spline (default is 3). Returns ------- resampts : (2-tuple of) :class:`numpy.ndarray` Linearly spaced sample points (1-d for each dimension) in the original range(s). resamples : :class:`numpy.ndarray` Interpolated samples. Examples -------- Resample a 2-d function: >>> def f(x, y): return np.exp(-x*np.sqrt(y)) >>> sampts = 1.e-3 * np.arange(1., 51.) >>> samples = f(*np.meshgrid(sampts, sampts, indexing='ij')) >>> resampts, resamples = resample_lin(sampts, samples, size=10) Check that the resampling is accurate: >>> resamples_expected = f( ... *np.meshgrid(resampts[0], resampts[-1], indexing='ij') ... ) >>> print( ... "Resampling is accurate:", ... np.allclose(resamples, resamples_expected) ... ) Resampling is accurate: True """ sampts = np.squeeze(sampts) samples = np.squeeze(samples) if samples.ndim == 1: if sampts.ndim != 1: raise ValueError("Sample points must be 1-d.") resampts = np.linspace( sampts[0], sampts[-1], size or len(sampts) ) resamples = InterpolatedUnivariateSpline( sampts, samples, k=spline, ext='raise' )(resampts) return resampts, resamples if samples.ndim == 2: if sampts.ndim == 1: sampts = np.array([sampts, sampts]) if len(sampts) != 2 or sampts[0].ndim != 1 or sampts[-1].ndim != 1: raise ValueError( "Must provide (two) 1-d array(s) as sample points " "for 2-d resampling." ) size = size if isinstance(size, Sequence) else (size, size) resampts = ( np.linspace( sampts[0][0], sampts[0][-1], size[0] or len(sampts[0]) ), np.linspace( sampts[-1][0], sampts[-1][-1], size[-1] or len(sampts[-1]) ) ) resamples = RectBivariateSpline( sampts[0], sampts[-1], samples, kx=spline, ky=spline )(*resampts) return resampts, resamples raise ValueError(f"Sample points must be 1-d or 2-d: {samples.ndim=}.")