Source code for cobi.sht

"""
Spherical Harmonic Transforms Module
=====================================

This module provides fast spherical harmonic transforms using the DUCC0 library
for HEALPix maps. It offers significant performance improvements over healpy's
default implementation, especially for high-resolution maps.

The module supports:
- Forward transforms (map2alm) for spin-0 and spin-2 fields
- Adjoint/inverse transforms (alm2map)
- Automatic pixel weighting for accurate transforms
- Beam smoothing operations
- Multi-threaded execution

Classes
-------
HealpixDUCC
    Main class for performing spherical harmonic transforms on HEALPix maps
    using the DUCC0 library.

Example
-------
    from cobi.sht import HealpixDUCC
    import numpy as np
    
    # Initialize for nside=512
    hp_ducc = HealpixDUCC(nside=512)
    
    # Transform Q/U polarization maps to E/B alms
    qu_maps = np.array([q_map, u_map])
    eb_alms = hp_ducc.map2alm(qu_maps, lmax=1500, nthreads=4)
    
    # Smooth and transform back
    smoothed_maps = hp_ducc.smoothing(qu_maps, lmax=1500, fwhm=5.0, nthreads=4)

Notes
-----
This module is imported from https://github.com/antolonappan/PyNILC/blob/main/pynilc/sht.py
and provides a high-performance alternative to healpy for spherical harmonic transforms.
"""

# imported from https://github.com/antolonappan/PyNILC/blob/main/pynilc/sht.py
from ducc0.sht.experimental import synthesis, adjoint_synthesis
import ducc0
import numpy as np
import healpy as hp



[docs] class HealpixDUCC: """ A class to handle HEALPix geometry and perform spherical harmonic transforms. Attributes: ---------- nside : int The HEALPix nside parameter. """
[docs] def __init__(self, nside): base = ducc0.healpix.Healpix_Base(nside, "RING") geom = base.sht_info() area = (4 * np.pi) / (12 * nside ** 2) w = np.full((geom['theta'].size, ), area) phi0 = geom['phi0'] nphi = geom['nphi'] ringstart = geom['ringstart'] theta = geom['theta'] for arr in [phi0, nphi, ringstart, w]: assert arr.size == theta.size argsort = np.argsort(ringstart) # We sort here the rings by order in the maps self.theta = theta[argsort].astype(np.float64) self.weight = w[argsort].astype(np.float64) self.phi0 = phi0[argsort].astype(np.float64) self.nph = nphi[argsort].astype(np.uint64) self.ofs = ringstart[argsort].astype(np.uint64)
[docs] def getsize(self, lmax, mmax): """ Get size of the alm array for a given lmax and mmax. Parameters: ----------- lmax : int Maximum l. mmax : int Maximum m. """ return ((mmax+1) * (mmax+2)) // 2 + (mmax+1) * (lmax-mmax)
[docs] def __tomap__(self, alm: np.ndarray, spin:int, lmax:int, mmax:int, nthreads:int, map:np.ndarray=None, **kwargs): """ A helper function to perform the forward spherical harmonic transform. Parameters: ----------- alm : np.ndarray The input alm array. spin : int The spin of the field. lmax : int Maximum l. mmax : int Maximum m. nthreads : int Number of threads to use. map : np.ndarray The output map array. """ alm = np.atleast_2d(alm) return synthesis(alm=alm, theta=self.theta, lmax=lmax, mmax=mmax, nphi=self.nph, spin=spin, phi0=self.phi0, nthreads=nthreads, ringstart=self.ofs, map=map, **kwargs)
[docs] def __toalm__(self, m: np.ndarray, spin:int, lmax:int, mmax:int, nthreads:int, alm=None, apply_weights=True, **kwargs): """ A helper function to perform the adjoint spherical harmonic transform. Parameters: ----------- m : np.ndarray The input map array. spin : int The spin of the field. lmax : int Maximum l. mmax : int Maximum m. nthreads : int Number of threads to use. alm : np.ndarray The output alm array. apply_weights : bool Whether to apply the weights. """ m = np.atleast_2d(m) if apply_weights: for of, w, npi in zip(self.ofs, self.weight, self.nph): m[:, of:of + npi] *= w if alm is not None: assert alm.shape[-1] == self.getsize(lmax, mmax) return adjoint_synthesis(map=m, theta=self.theta, lmax=lmax, mmax=mmax, nphi=self.nph, spin=spin, phi0=self.phi0, nthreads=nthreads, ringstart=self.ofs, alm=alm, **kwargs)
[docs] def __alm2map_spin2__(self, alm:np.ndarray, lmax:int, nthreads:int, **kwargs): """ A helper function to perform the forward spherical harmonic transform for spin-2 fields. Parameters: ----------- alm : np.ndarray The input alm array. lmax : int Maximum l. nthreads : int Number of threads to use. """ return self.__tomap__(alm, 2, lmax, lmax, nthreads, **kwargs)
[docs] def __map2alm_spin2__(self, m:np.ndarray, lmax:int, nthreads:int, **kwargs): """ A helper function to perform the adjoint spherical harmonic transform for spin-2 fields. Parameters: ----------- m : np.ndarray The input map array. lmax : int Maximum l. nthreads : int Number of threads to use. """ return self.__toalm__(m.copy(), 2, lmax, lmax, nthreads, **kwargs)
[docs] def __alm2map_spin0__(self, alm:np.ndarray, lmax:int, nthreads:int, **kwargs): """ A helper function to perform the forward spherical harmonic transform for spin-0 fields. Parameters: ----------- alm : np.ndarray The input alm array. lmax : int Maximum l. nthreads : int Number of threads to use """ return self.__tomap__(alm, 0, lmax, lmax, nthreads, **kwargs).squeeze()
[docs] def __map2alm_spin0__(self, m:np.ndarray, lmax:int, nthreads:int, **kwargs): """ A helper function to perform the adjoint spherical harmonic transform for spin-0 fields. Parameters: ----------- m : np.ndarray The input map array. lmax : int Maximum l. nthreads : int Number of threads to use. """ return self.__toalm__(m.copy(), 0, lmax, lmax, nthreads, **kwargs).squeeze()
[docs] def alm2map(self, alm:np.ndarray, lmax:int,nthreads:int, **kwargs): """ Perform the forward spherical harmonic transform. Parameters: ----------- alm : np.ndarray The input alm array. lmax : int Maximum l. nthreads : int Number of threads to use. """ if len(alm) > 3: return self.__alm2map_spin0__(alm, lmax, nthreads, **kwargs) elif len(alm) == 2: return self.__alm2map_spin2__(alm, lmax, nthreads, **kwargs) elif len(alm) == 3: return np.concatenate([self.__alm2map_spin0__(alm[0], lmax, nthreads, **kwargs).reshape(1, -1), self.__alm2map_spin2__(alm[1:], lmax, nthreads, **kwargs)], axis=0) else: raise ValueError("Invalid alm shape")
[docs] def map2alm(self, m:np.ndarray, lmax:int, nthreads:int, **kwargs): """ Perform the adjoint spherical harmonic transform. Parameters: ----------- m : np.ndarray The input map array. lmax : int Maximum l. nthreads : int Number of threads to use. """ if len(m) > 3: return self.__map2alm_spin0__(m, lmax, nthreads, **kwargs) elif len(m) == 2: return self.__map2alm_spin2__(m, lmax, nthreads, **kwargs) elif len(m) == 3: return np.concatenate([self.__map2alm_spin0__(m[0], lmax, nthreads, **kwargs).reshape(1, -1), self.__map2alm_spin2__(m[1:], lmax, nthreads, **kwargs)], axis=0) else: raise ValueError("Invalid map shape")
[docs] def smoothing_alms(self, alms:np.ndarray, lmax:int, fwhm:float, nthreads:int, **kwargs): """ Smooth the alm array. Parameters: ----------- alms : np.ndarray The input alm array. lmax : int Maximum l. fwhm : float The full width at half maximum of the beam. nthreads : int Number of threads to use. """ bl = hp.gauss_beam(np.radians(fwhm/60.), lmax=lmax, pol=True).T if len(alms) > 3: hp.almxfl(alms, bl[0],inplace=True) elif len(alms) == 3: hp.almxfl(alms[0], bl[0],inplace=True) hp.almxfl(alms[1], bl[1],inplace=True) hp.almxfl(alms[2], bl[2],inplace=True) else: raise ValueError("Invalid alm shape")
[docs] def smoothing_maps(self, map:np.ndarray, lmax:int, fwhm:float, nthreads:int, **kwargs): """ Smooth the map array. Parameters: ----------- map : np.ndarray The input map array. lmax : int Maximum l. fwhm : float The full width at half maximum of the beam. nthreads : int Number of threads to use. """ alms = self.map2alm(map, lmax, nthreads, **kwargs) self.smoothing_alms(alms, lmax, fwhm, nthreads, **kwargs) return self.alm2map(alms, lmax, nthreads, **kwargs)
[docs] def smoothing(self, data:np.ndarray, lmax:int, fwhm:float, nthreads:int, **kwargs): """ Smooth the Map or alm. Parameters: ----------- data : np.ndarray The input data array. lmax : int Maximum l. fwhm : float The full width at half maximum of the beam. nthreads : int Number of threads to use. """ if data.dtype == np.complex64: return self.smoothing_alms(data, lmax, fwhm, nthreads, **kwargs) else: return self.smoothing_maps(data, lmax, fwhm, nthreads, **kwargs)