Source code for cobi.utils

"""
Utility Functions Module
=========================

This module contains utility functions for various operations used throughout
the COBI package, including logging, coordinate transformations, map operations,
and numerical utilities.

Functions
---------
Logger
    Configurable logging class for debugging and information messages
inrad
    Convert angles from degrees to radians
cli
    Compute inverse of array elements (safe for zeros)
download_file
    Download files with progress bar
deconvolveQU
    Deconvolve beam from polarization maps
change_coord
    Transform HEALPix maps between coordinate systems
slice_alms
    Slice spherical harmonic coefficients to new lmax

The module provides essential functionality for:
- Logging with configurable verbosity levels
- Angular unit conversions
- Safe numerical operations (division, inversion)
- File downloading with progress tracking
- HEALPix map manipulations (coordinate transforms, beam operations)
- Spherical harmonic coefficient operations
"""

# This file contains utility functions that are used in the main script.

import requests
import logging
import numpy as np
from tqdm import tqdm
import healpy as hp

[docs] class Logger:
[docs] def __init__(self, name: str, verbose: bool = False): """ Initializes the logger. Parameters: name (str): Name of the logger, typically the class name or module name. verbose (bool): If True, set logging level to DEBUG, otherwise to WARNING. """ self.logger = logging.getLogger(name) # Configure logging level based on verbosity if verbose: self.logger.setLevel(logging.DEBUG) else: self.logger.setLevel(logging.WARNING) # Prevent adding multiple handlers to the logger if not self.logger.hasHandlers(): # Create console handler ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) # Create formatter and add it to the handler formatter = logging.Formatter('%(name)s : %(levelname)s - %(message)s') ch.setFormatter(formatter) # Add handler to the logger self.logger.addHandler(ch)
[docs] def log(self, message: str, level: str = 'info'): """ Logs a message at the specified logging level. Parameters: message (str): The message to log. level (str): The logging level (debug, info, warning, error, critical). """ level = level.lower() if level == 'debug': self.logger.debug(message) elif level == 'info': self.logger.info(message) elif level == 'warning': self.logger.warning(message) elif level == 'error': self.logger.error(message) elif level == 'critical': self.logger.critical(message) else: self.logger.info(message)
[docs] def inrad(alpha: float) -> float: """ Converts an angle from degrees to radians. Parameters: alpha (float): The angle in degrees. Returns: float: The angle in radians. """ return np.deg2rad(alpha)
[docs] def cli(cl: np.ndarray) -> np.ndarray: """ Computes the inverse of each element in the input array `cl`. Parameters: cl (np.ndarray): Input array for which the inverse is calculated. Only positive values will be inverted; zeros and negative values will remain zero. Returns: np.ndarray: An array where each element is the inverse of the corresponding element in `cl`, with zeros or negative values left unchanged. """ ret = np.zeros_like(cl) ret[np.where(cl > 0)] = 1.0 / cl[np.where(cl > 0)] return ret
[docs] def download_file(url, filename): """Download a file with a progress bar.""" response = requests.get(url, stream=True) total_size = int(response.headers.get('content-length', 0)) block_size = 1024 # 1 Kibibyte t = tqdm(total=total_size, unit='iB', unit_scale=True, desc=f'Downloading {filename}') with open(filename, 'wb') as file: for data in response.iter_content(block_size): t.update(len(data)) file.write(data) t.close()
[docs] def deconvolveQU(QU,beam): """ Deconvolves a beam from a QU map. Parameters: QU (np.ndarray): The input QU map. beam (np.ndarray): The beam to deconvolve. Returns: np.ndarray: The deconvolved QU map. """ beam = np.radians(beam/60) nside = hp.npix2nside(len(QU[0])) elm,blm = hp.map2alm_spin(QU,2) lmax = hp.Alm.getlmax(len(elm)) bl = hp.gauss_beam(beam,lmax=lmax,pol=True).T hp.almxfl(elm,cli(bl[1]),inplace=True) hp.almxfl(blm,cli(bl[2]),inplace=True) return hp.alm2map_spin([elm,blm],nside,2,lmax)
[docs] def change_coord(m, coord=['C', 'G']): npix = m.shape[-1] nside = hp.npix2nside(npix) ang = hp.pix2ang(nside, np.arange(npix)) rot = hp.Rotator(coord=reversed(coord)) new_ang = rot(*ang) new_pix = hp.ang2pix(nside, *new_ang) return m[..., new_pix]
[docs] def slice_alms(teb, lmax_new): """Returns the input teb alms sliced to the new lmax. teb(numpy array): input teb alms lmax_new(int): new lmax """ nfields = len(teb) lmax = hp.Alm.getlmax(len(teb[0])) if nfields > 3: lmax = nfields nfields = 1 dtype = teb.dtype else: dtype = teb[0].dtype if lmax_new > lmax: raise ValueError('lmax_new must be smaller or equal to lmax') elif lmax_new == lmax: return teb else: teb_new = np.zeros((nfields, hp.Alm.getsize(lmax_new)), dtype=dtype) indices_full = hp.Alm.getidx(lmax,*hp.Alm.getlm(lmax_new)) indices_new = hp.Alm.getidx(lmax_new,*hp.Alm.getlm(lmax_new)) teb_new[:,indices_new] = teb[:,indices_full] return teb_new
[docs] class Binner: """ binner = Binner(n, lmin=2, lmax=..., method='linear') binner.b : effective centers (mode-counting weights), never singleton at l=2 binner.bin : bins Cl with (2l+1) or (2l+1)/vl^2 weights """
[docs] def __init__(self, n, lmin=2, lmax=None, method="linear"): if not isinstance(n, int) or n <= 0: raise ValueError("n must be a positive integer") if not isinstance(lmin, int) or lmin < 2: raise ValueError("lmin must be an integer >= 2") if lmax is None or (not isinstance(lmax, int)) or lmax < lmin: raise ValueError("lmax must be an integer and >= lmin") self.n = n self.lmin = lmin self.lmax = lmax self.method = (method or "linear").lower() # integer slice-edges [l0:l1) covering lmin..lmax inclusive self.bp = self._make_edges(self.n, self.lmin, self.lmax, self.method, min_width=2) # precompute mode-counting effective centers (independent of cl) ell = np.arange(self.lmax + 1, dtype=float) w = 2.0 * ell + 1.0 self.b = self._weighted_centers(ell, w, self.bp)
def __getattribute__(self, name: str): if name == "bin" or name == 'bin_cell' or name == 'binner' or name == 'do_bin': return object.__getattribute__(self, 'bin') else: return object.__getattribute__(self, name)
[docs] def bin(self, cl, vl=None): cl = np.asarray(cl) if cl.shape[-1] <= self.lmax: raise ValueError(f"cl last axis must have length >= lmax+1={self.lmax+1}") ell = np.arange(self.lmax + 1, dtype=float) if vl is None: w_ell = 2.0 * ell + 1.0 else: vl = np.asarray(vl, dtype=float) if vl.shape[0] < self.lmax + 1: raise ValueError("vl must have length >= lmax+1") w_ell = (2.0 * ell + 1.0) / (vl[: self.lmax + 1] ** 2) cl_use = cl[..., : self.lmax + 1] return self._bin_core(cl_use, w_ell, self.bp)
# ---------------- internals ---------------- @staticmethod def _proposed_edges_float(n, lmin, lmax, method): if method == "linear": # IMPORTANT: use lmax+1 because edges are for half-open slices [l0:l1) return np.linspace(lmin, lmax + 1, n + 1) if method == "log": return lmin * np.exp(np.linspace(0.0, np.log((lmax + 1) / lmin), n + 1)) if method == "log10": return lmin * 10.0 ** np.linspace(0.0, np.log10((lmax + 1) / lmin), n + 1) if method == "p2": return np.linspace(np.sqrt(lmin), np.sqrt(lmax + 1), n + 1) ** 2 if method == "p3": return np.linspace(lmin ** (1 / 3), (lmax + 1) ** (1 / 3), n + 1) ** 3 raise ValueError("Unknown method (linear/log/log10/p2/p3).") @classmethod def _make_edges(cls, n, lmin, lmax, method, min_width=2): """ Build integer slice edges bp of length n+1 such that bins are [bp[i]:bp[i+1]) and cover lmin..lmax inclusive (so bp[-1]=lmax+1). Enforce bp[i+1]-bp[i] >= min_width. """ L = (lmax - lmin + 1) # number of multipoles included if n * min_width > L: raise ValueError( f"Requested n={n} is too large to keep min bin width {min_width} over " f"[{lmin},{lmax}] (available multipoles={L}). Reduce n." ) bp_f = cls._proposed_edges_float(n, lmin, lmax, method) # Start from floor to avoid tiny first bin from rounding near lmin bp = np.floor(bp_f).astype(int) # enforce endpoints bp[0] = lmin bp[-1] = lmax + 1 bp = np.clip(bp, lmin, lmax + 1) # enforce minimum width progressively for i in range(1, len(bp)): needed = bp[i - 1] + min_width if bp[i] < needed: bp[i] = needed # if we pushed the end beyond lmax+1, pull back uniformly while preserving min_width if bp[-1] > lmax + 1: bp[-1] = lmax + 1 for i in range(len(bp) - 2, -1, -1): allowed = bp[i + 1] - min_width if bp[i] > allowed: bp[i] = allowed bp[0] = lmin # final checks if bp[0] != lmin or bp[-1] != lmax + 1: raise ValueError("Failed to construct valid edges; reduce n.") if np.any(np.diff(bp) < min_width): raise ValueError("Failed to enforce minimum bin width; reduce n.") return bp @staticmethod def _bin_core(cl, w_ell, bp): out = [] for i in range(len(bp) - 1): l0, l1 = bp[i], bp[i + 1] # half-open w = w_ell[l0:l1] den = np.sum(w) if den == 0: out.append(np.zeros(cl.shape[:-1], dtype=float)) continue if cl.ndim == 1: out.append(np.sum(w * cl[l0:l1]) / den) else: out.append(np.sum(w * cl[..., l0:l1], axis=-1) / den) return np.stack(out, axis=-1) @staticmethod def _weighted_centers(ell, w_ell, bp): b = np.zeros(len(bp) - 1, dtype=float) for i in range(len(bp) - 1): l0, l1 = bp[i], bp[i + 1] w = w_ell[l0:l1] den = np.sum(w) b[i] = np.sum(w * ell[l0:l1]) / den if den != 0 else 0.5 * (l0 + l1 - 1) return b