"""
Foreground Simulation Module
=============================
This module handles Galactic foreground emission modeling for CMB analysis,
including dust and synchrotron radiation with frequency-dependent properties.
The module provides:
- Multi-frequency foreground map generation
- Bandpass integration for realistic instrument response
- PySM3 sky model integration (d1, s1 models)
- Spatial templates for dust and synchrotron
- Coordinate transformation support
Classes
-------
BandpassInt
Handles bandpass profile integration for multi-frequency observations.
Foreground
Main class for generating dust and synchrotron foreground maps.
Example
-------
Generate foreground maps for multiple frequencies::
from cobi.simulation import Foreground
import numpy as np
fg = Foreground(
libdir='./fg_sims',
nside=512,
freqs=np.array([27, 39, 93, 145, 225, 280]), # GHz
models=['d1', 's1'], # dust and synchrotron
coord='C' # Celestial coordinates
)
# Get foreground realization
fg_maps = fg.get_map(idx=0) # Shape: (nfreqs, 3, npix)
Notes
-----
Uses PySM3 for generating foreground templates with realistic spectral
energy distributions. Supports bandpass integration for accurate modeling
of detector response.
"""
# General imports
import os
import pysm3
import numpy as np
import healpy as hp
import pickle as pl
from typing import Tuple
from pysm3 import units as u
import matplotlib.pyplot as plt
from typing import List, Optional
# Local imports
from cobi import mpi
from cobi.utils import Logger
from cobi.data import BP_PROFILE
[docs]
class BandpassInt:
[docs]
def __init__(
self,
libdir: str,
):
"""
Initializes the BandpassInt class, loading bandpass profiles from a specified file.
"""
BP_PROFILE.directory = libdir
self.bp = BP_PROFILE.data
[docs]
def get_profile(self, band: str) -> Tuple[np.ndarray, np.ndarray]:
"""
Retrieves the frequency and bandpass profile for a specified band.
Parameters:
band (str): The frequency band for which to retrieve the profile.
Returns:
Tuple[np.ndarray, np.ndarray]: A tuple containing two NumPy arrays:
- nu: Array of frequencies (GHz) where the bandpass is defined.
- bp: Array of bandpass values corresponding to the frequencies.
"""
nu, bp = self.bp[band]
return nu[nu > 0], bp[nu > 0]
[docs]
def plot_profiles(self) -> None:
"""
Plots the bandpass profiles for all available bands.
The method iterates over all bands, retrieves the bandpass profile,
and plots the profile as a function of frequency (GHz).
"""
bands = self.bp.keys()
plt.figure(figsize=(6, 4))
for b in bands:
nu, bp = self.get_profile(b)
plt.plot(nu, bp, label=b)
plt.xlabel("Frequency (GHz)")
plt.ylabel("Bandpass Response")
plt.legend(title="Bands")
plt.tight_layout()
plt.show()
[docs]
class Foreground:
"""
Galactic foreground generator for CMB analysis.
This class generates multi-frequency Galactic foreground emission maps
using PySM3 models for dust and synchrotron radiation. Supports bandpass
integration for realistic detector response.
Parameters
----------
libdir : str
Directory for caching foreground maps.
nside : int
HEALPix resolution parameter.
dust_model : int
PySM3 dust model number (e.g., 1 for d1, 6 for d6).
sync_model : int
PySM3 synchrotron model number (e.g., 1 for s1, 3 for s3).
bandpass : bool, default=False
Whether to apply bandpass integration using detector profiles.
verbose : bool, default=True
Enable logging output.
Attributes
----------
nside : int
HEALPix resolution.
dust_model : int
Dust emission model identifier.
sync_model : int
Synchrotron emission model identifier.
bandpass : bool
Whether bandpass integration is enabled.
bp_profile : BandpassInt or None
Bandpass profile handler (if enabled).
Methods
-------
dustQU(band)
Generate dust Q/U polarization maps at given frequency.
syncQU(band)
Generate synchrotron Q/U polarization maps at given frequency.
get_map(freqs, idx)
Get combined foreground map for multiple frequencies.
Examples
--------
Generate foreground maps without bandpass::
fg = Foreground(
libdir='./fg_sims',
nside=512,
dust_model=1,
sync_model=1,
bandpass=False
)
# Get dust emission at 145 GHz
dust_map = fg.dustQU('145')
With bandpass integration::
fg = Foreground(
libdir='./fg_sims',
nside=512,
dust_model=1,
sync_model=1,
bandpass=True
)
# Generate multi-frequency maps
freqs = ['027', '039', '093', '145', '225', '280']
fg_maps = fg.get_map(freqs, idx=0)
Notes
-----
- Uses PySM3 for physically-motivated foreground models
- Caches maps to disk for efficient reuse
- Supports MPI parallelization
- Output in μK_CMB units
"""
[docs]
def __init__(
self,
libdir: str,
nside: int,
dust_model: int,
sync_model: int,
bandpass: bool = False,
verbose: bool = True,
):
"""
Initializes the Foreground class for generating and handling dust and synchrotron foreground maps.
Parameters:
libdir (str): Directory where the foreground maps will be stored.
nside (int): HEALPix resolution parameter.
dust_model (int): Model number for the dust emission.
sync_model (int): Model number for the synchrotron emission.
bandpass (bool, optional): If True, bandpass integration is applied. Defaults to False.
"""
self.logger = Logger(self.__class__.__name__, verbose=verbose)
self.basedir = libdir
self.libdir = os.path.join(libdir, f"Foregrounds{dust_model}{sync_model}")
if mpi.rank == 0:
os.makedirs(self.libdir, exist_ok=True)
mpi.barrier()
self.nside = nside
self.dust_model = dust_model
self.sync_model = sync_model
self.bandpass = bandpass
if bandpass:
self.bp_profile = BandpassInt(libdir)
self.logger.log("Bandpass integration is enabled", level="info")
else:
self.bp_profile = None
self.logger.log("Bandpass integration is disabled", level="info")
[docs]
def dustQU(self, band: str) -> np.ndarray:
"""
Generates or retrieves the Q and U Stokes parameters for dust emission at a given frequency band.
Parameters:
band (str): The frequency band.
Returns:
np.ndarray: A NumPy array containing the Q and U maps.
"""
name = (
f"dustQU_N{self.nside}_f{band}.fits"
if not self.bandpass
else f"dustQU_N{self.nside}_f{band}_bp.fits"
)
fname = os.path.join(self.libdir, name)
if os.path.isfile(fname):
self.logger.log(f"Loading dust Q and U maps for band {band}", level="info")
return hp.read_map(fname, field=[0, 1]) # type: ignore
else:
self.logger.log(f"Generating dust Q and U maps for band {band}", level="info")
sky = pysm3.Sky(
nside=self.nside, preset_strings=[f"d{int(self.dust_model)}"]
)
if self.bandpass:
if self.bp_profile is not None:
nu, weights = self.bp_profile.get_profile(band)
else:
raise ValueError("Bandpass profile is not initialized.")
nu = nu * u.GHz # type: ignore
maps = sky.get_emission(nu, weights)
maps *= pysm3.utils.bandpass_unit_conversion(nu, weights=weights, output_unit=u.uK_CMB)
else:
maps = sky.get_emission(int(band) * u.GHz) # type: ignore
maps = maps.to(u.uK_CMB, equivalencies=u.cmb_equivalencies(int(band) * u.GHz)) # type: ignore
if mpi.rank == 0:
hp.write_map(fname, maps[1:], dtype=np.float64) # type: ignore
mpi.barrier()
return maps[1:].value
[docs]
def syncQU(self, band: str) -> np.ndarray:
"""
Generates or retrieves the Q and U Stokes parameters for synchrotron emission at a given frequency band.
Parameters:
band (str): The frequency band.
Returns:
np.ndarray: A NumPy array containing the Q and U maps.
"""
name = (
f"syncQU_N{self.nside}_f{band}.fits"
if not self.bandpass
else f"syncQU_N{self.nside}_f{band}_bp.fits"
)
fname = os.path.join(self.libdir, name)
if os.path.isfile(fname):
self.logger.log(f"Loading synchrotron Q and U maps for band {band}", level="info")
return hp.read_map(fname, field=[0, 1]) # type: ignore
else:
self.logger.log(f"Generating synchrotron Q and U maps for band {band}", level="info")
sky = pysm3.Sky(
nside=self.nside, preset_strings=[f"s{int(self.sync_model)}"]
)
if self.bandpass:
if self.bp_profile is not None:
nu, weights = self.bp_profile.get_profile(band)
else:
raise ValueError("Bandpass profile is not initialized.")
nu = nu * u.GHz # type: ignore
maps = sky.get_emission(nu, weights)
maps *= pysm3.utils.bandpass_unit_conversion(nu, weights=weights, output_unit=u.uK_CMB)
else:
maps = sky.get_emission(int(band) * u.GHz) # type: ignore
maps = maps.to(u.uK_CMB, equivalencies=u.cmb_equivalencies(int(band) * u.GHz)) # type: ignore
if mpi.rank == 0:
hp.write_map(fname, maps[1:], dtype=np.float64) # type: ignore
mpi.barrier()
return maps[1:].value
[docs]
class HILC:
"""
This class is used to perform Harmonic ILC on the foregrounds.
"""
[docs]
def __init__(self):
pass
[docs]
def harmonic_ilc_alm(self, alms : np.ndarray, lbins : Optional[List[int]] = None) -> tuple:
"""
This method is used to perform Harmonic ILC on the foregrounds.
:param alms: The foregrounds in alm format.
:type alms: np.ndarray
:param lbins: The list of ell bins.
:type lbins: Optional[List[int]], optional
:return: The Harmonic ILC results and the ILC filter.
:rtype: tuple
"""
A = np.ones((len(alms), 1))
cov = self.empirical_harmonic_covariance(alms)
if lbins is not None:
for lmin, lmax in zip(lbins[:-1], lbins[1:]):
# Average the covariances in the bin
lmax = min(lmax, cov.shape[-1])
dof = 2 * np.arange(lmin, lmax) + 1
cov[..., lmin:lmax] = (
(dof / dof.sum() * cov[..., lmin:lmax]).sum(-1)
)[..., np.newaxis]
cov = self.regularized_inverse(cov.swapaxes(-1, -3))
ilc_filter = np.linalg.inv(A.T @ cov @ A) @ A.T @ cov
del cov, dof
result = self.apply_harmonic_W(ilc_filter, alms)
return result,ilc_filter
[docs]
def empirical_harmonic_covariance(self,alms : np.ndarray) -> np.ndarray:
"""
The method empirical_harmonic_covariance is used to compute the empirical harmonic covariance.
:param alms: The foregrounds in alm format.
:type alms: np.ndarray
:return: The empirical harmonic covariance.
:rtype: np.ndarray
"""
alms = np.array(alms, copy=False, order='C')
alms = alms.view(np.float64).reshape(alms.shape+(2,))
if alms.ndim > 3: # Shape has to be ([Stokes], freq, lm, ri)
alms = alms.transpose(1, 0, 2, 3)
lmax = hp.Alm.getlmax(alms.shape[-2])
res = (alms[..., np.newaxis, :, :lmax+1, 0]
* alms[..., :, np.newaxis, :lmax+1, 0]) # (Stokes, freq, freq, ell)
consumed = lmax + 1
for i in range(1, lmax+1):
n_m = lmax + 1 - i
alms_m = alms[..., consumed:consumed+n_m, :]
res[..., i:] += 2 * np.einsum('...fli,...nli->...fnl', alms_m, alms_m)
consumed += n_m
res /= 2 * np.arange(lmax + 1) + 1
return res
[docs]
def regularized_inverse(self,cov : np.ndarray) -> np.ndarray:
"""
The method regularized_inverse is used to compute the regularized inverse of the covariance.
:param cov: The covariance.
:type cov: np.ndarray
:return: The regularized inverse of the covariance.
:rtype: np.ndarray
"""
inv_std = np.einsum('...ii->...i', cov)
inv_std = 1 / np.sqrt(inv_std)
np.nan_to_num(inv_std, False, 0, 0, 0)
np.nan_to_num(cov, False, 0, 0, 0)
inv_cov = np.linalg.pinv(cov
* inv_std[..., np.newaxis]
* inv_std[..., np.newaxis, :])
return inv_cov * inv_std[..., np.newaxis] * inv_std[..., np.newaxis, :]
[docs]
def apply_harmonic_W(self,W : np.ndarray, alms: np.ndarray) -> np.ndarray:
"""
The method apply_harmonic_W is used to apply the harmonic weights.
:param W: The harmonic weights.
:type W: np.ndarray
:param alms: The foregrounds in alm format.
:type alms: np.ndarray
:return: The alms after applying the harmonic weights.
:rtype: np.ndarray
"""
lmax = hp.Alm.getlmax(alms.shape[-1])
res = np.full((W.shape[-2],) + alms.shape[1:], np.nan, dtype=alms.dtype)
start = 0
for i in range(0, lmax+1):
n_m = lmax + 1 - i
res[..., start:start+n_m] = np.einsum('...lcf,f...l->c...l',
W[..., i:, :, :],
alms[..., start:start+n_m])
start += n_m
return res