"""
Calibration analysis module for LAT-SAT cross-spectrum studies.
This module provides classes for fitting calibration parameters (angles and amplitudes)
using cross-correlations between Large Aperture Telescope (LAT) and Small Aperture
Telescope (SAT) data from CMB observations.
"""
import numpy as np
from tqdm import tqdm
from dataclasses import dataclass, field
from typing import Any, Optional, Tuple, Dict, List, Union
from abc import ABC, abstractmethod
from cobi.simulation import CMB
from cobi.utils import Logger
import os
import healpy as hp
import pickle as pl
import emcee
from getdist import plots, MCSamples
import matplotlib.pyplot as plt
from scipy.stats import chi2
from scipy.optimize import minimize
from cobi.spectra import SpectraCross
def _oas_shrinkage(S: np.ndarray, n: int) -> np.ndarray:
"""
Oracle Approximating Shrinkage estimator (Chen et al. 2010).
Finds the optimal convex combination of the sample covariance S and
a scaled identity that minimises the expected Frobenius loss. Well-
conditioned for any n > 1, even when n << p.
"""
p = S.shape[0]
tr_S = np.trace(S)
tr_S2 = np.trace(S @ S)
mu = tr_S / p
rho_num = (1.0 - 2.0 / p) * tr_S2 + tr_S ** 2
rho_den = (n + 1.0 - 2.0 / p) * (tr_S2 - tr_S ** 2 / p)
rho = 1.0 if rho_den == 0 else min(1.0, rho_num / rho_den)
return (1.0 - rho) * S + rho * mu * np.eye(p)
# ============================================================
# =============== SHARED SUPERCLASS ==========================
# ============================================================
[docs]
class BaseSat4LatCross(ABC):
"""
Base class for LAT-SAT cross-spectrum calibration analyses.
Provides data loading, beam handling, and plotting logic.
Subclasses must define:
- self.__pnames__ / self.__plabels__
- self.theory(theta)
- self.lnprior(theta)
Attributes:
spec_cross: Cross-spectrum calculation object
sat_err: Satellite calibration error
sat_lrange: Satellite ell range for fitting
lat_lrange: LAT ell range for fitting
fit_per_split: Whether to fit per split or per frequency
spectra_selection: Which spectra to include ('all', 'auto_only', 'cross_only')
libdir: Directory for caching results
binner: Binning information object
Lmax: Maximum multipole
maptags: List of map tags
freq_groups: Dictionary mapping frequency bases to indices
freq_bases: List of frequency base names
mean_spec: Mean spectra from simulations
std_spec: Standard deviation of spectra
beam_arr: Beam transfer function array
"""
[docs]
def __init__(self,
spec_cross: 'SpectraCross',
sat_err: float,
sat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
lat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
fit_per_split: bool = True,
spectra_selection: str = 'all',
libdir_suffix: str = 'BaseSat4LatCross',
num_sims: int = 100,
verbose: bool = False,
cov_mode: str = 'diagonal') -> None:
"""
cov_mode controls how the data covariance is estimated:
'diagonal' – per-bin variance from sims (default, always safe).
'block_diag' – one (n_bins × n_bins) block per (i,j) map pair,
each block Hartlap-corrected. Needs N_sims >> n_bins
(not N_sims >> n_data). Recommended when you want
ell-ell correlations without full-matrix instability.
'shrinkage' – full (n_data × n_data) covariance shrunk toward the
identity via OAS. Well-conditioned at any N_sims.
'full' – raw sample covariance + Hartlap correction.
Requires N_sims > n_data + 2; raises ValueError
otherwise.
"""
if cov_mode not in ('diagonal', 'block_diag', 'shrinkage', 'full'):
raise ValueError(f"cov_mode must be one of 'diagonal', 'block_diag', 'shrinkage', 'full'")
self.logger = Logger(self.__class__.__name__, verbose=verbose)
self.logger.log(f"Initializing {self.__class__.__name__}...", level="info")
self.spec_cross = spec_cross
self.libdir = os.path.join(spec_cross.libdir, libdir_suffix)
os.makedirs(self.libdir, exist_ok=True)
self.sat_err = sat_err
self.sat_lrange = sat_lrange
self.lat_lrange = lat_lrange
self.fit_per_split = fit_per_split
self.spectra_selection = spectra_selection
self.binner = spec_cross.binInfo
self.Lmax = spec_cross.lmax
self.cov_mode = cov_mode
self.use_full_cov = (cov_mode != 'diagonal')
self._num_sims = num_sims
# ---- Build map tags and frequency groups ----
self.maptags: List[str] = spec_cross.maptags.copy()
self.freq_groups: Dict[str, List[int]] = {}
for i, tag in enumerate(self.maptags):
base = tag.rsplit('-', 1)[0]
self.freq_groups.setdefault(base, []).append(i)
self.freq_bases: List[str] = list(self.freq_groups.keys())
# ---- Masks, spectra, beams ----
self.__likelihood_mask__: np.ndarray = self._build_likelihood_mask()
self.mean_spec: np.ndarray
self.std_spec: np.ndarray
self.mean_spec, self.std_spec = self._calc_mean_std(num_sims=num_sims)
self.beam_arr: np.ndarray = self._get_beam_arr()
if self.use_full_cov:
self.cov, self.cov_inv, self.logdet_cov = self._calc_covariance(num_sims=num_sims)
# Initialize parameter names and labels (to be set by subclasses)
self.__pnames__: List[str] = []
self.__plabels__: List[str] = []
self.logger.log(f"Initialized {self.__class__.__name__}.", level="info")
# -------------------------------------------------------------------------
# Abstract methods - must be implemented by subclasses
# -------------------------------------------------------------------------
[docs]
@abstractmethod
def theory(self, theta: np.ndarray) -> np.ndarray:
"""
Calculate theoretical model spectrum for given parameters.
Args:
theta: Parameter vector
Returns:
Model spectrum array with shape matching data
"""
pass
[docs]
@abstractmethod
def lnprior(self, theta: np.ndarray) -> float:
"""
Calculate log prior probability for given parameters.
Args:
theta: Parameter vector
Returns:
Log prior probability
"""
pass
# -------------------------------------------------------------------------
# Shared internals
# -------------------------------------------------------------------------
[docs]
def _build_likelihood_mask(self) -> np.ndarray:
"""
Build boolean mask for likelihood evaluation based on ell ranges.
Returns:
Boolean mask array with shape (n_tags, n_tags, n_bins)
"""
ells = self.binner.get_effective_ells()
sat_lmin, sat_lmax = self.sat_lrange
lat_lmin, lat_lmax = self.lat_lrange
sat_mask = (ells >= (sat_lmin or 0)) & (ells <= (sat_lmax or np.inf))
lat_mask = (ells >= (lat_lmin or 0)) & (ells <= (lat_lmax or np.inf))
cross_mask = sat_mask & lat_mask
n_tags = len(self.maptags)
n_bins = len(ells)
mask = np.zeros((n_tags, n_tags, n_bins), dtype=bool)
is_lat = np.array([tag.startswith('LAT') for tag in self.maptags])
for i in range(n_tags):
for j in range(n_tags):
# Determine which ell mask to use
if is_lat[i] and is_lat[j]:
# LAT x LAT
ell_mask = lat_mask
elif not is_lat[i] and not is_lat[j]:
# SAT x SAT
ell_mask = sat_mask
else:
# LAT x SAT cross
ell_mask = cross_mask
# Apply spectra selection
if self.spectra_selection == 'auto_only':
if i == j: # Auto-spectrum
mask[i, j] = ell_mask
elif self.spectra_selection == 'cross_only':
if i != j: # Cross-spectrum
mask[i, j] = ell_mask
else: # 'all'
mask[i, j] = ell_mask
return mask
[docs]
def __all_spectra__(self, num_sims: int = 100) -> np.ndarray:
"""
Get all spectra data matrix for current ell ranges.
Returns:
Data matrix array with shape (n_tags, n_tags, n_bins)
"""
all_specs = [
self.spec_cross.data_matrix(i, which='EB',
sat_lrange=self.sat_lrange,
lat_lrange=self.lat_lrange,
avg_splits=False)
for i in tqdm(range(num_sims))
]
all_specs = np.array(all_specs)
return all_specs
[docs]
def _calc_mean_std(self, num_sims: int = 100) -> Tuple[np.ndarray, np.ndarray]:
"""
Calculate mean and standard deviation of spectra over simulations.
Args:
num_sims: Number of simulations to use
Returns:
Tuple of (mean_spectrum, std_spectrum) arrays
"""
fname = os.path.join(self.libdir, f'mean_std_spec_{num_sims}_sims.pkl')
if os.path.exists(fname):
self.logger.log("Loading cached mean/std spectra...", level="info")
with open(fname, 'rb') as f:
return pl.load(f)
self.logger.log(f"Computing mean/std over {num_sims} simulations...", level="info")
all_specs = self.__all_spectra__(num_sims=num_sims)
mean_spec, std_spec = np.mean(all_specs, axis=0), np.std(all_specs, axis=0)
with open(fname, 'wb') as f:
pl.dump((mean_spec, std_spec), f)
return mean_spec, std_spec
[docs]
def _get_beam_arr(self) -> np.ndarray:
"""
Calculate beam transfer function array for all map tag combinations.
Returns:
Beam array with shape (n_tags, n_tags, n_bins)
"""
n_tags, n_bins = len(self.maptags), self.binner.get_n_bands()
beam = np.ones((n_tags, n_tags, n_bins))
fwhm_dict = {f'LAT_{f}': fw for f, fw in zip(self.spec_cross.lat.freqs, self.spec_cross.lat.fwhm)}
# Only add SAT beams if not in LAT-only mode
if hasattr(self.spec_cross, 'lat_only') and not self.spec_cross.lat_only:
fwhm_dict.update({f'SAT_{f}': fw for f, fw in zip(self.spec_cross.sat.freqs, self.spec_cross.sat.fwhm)})
for i, ti in enumerate(self.maptags):
for j, tj in enumerate(self.maptags):
bi, bj = ti.rsplit('-', 1)[0], tj.rsplit('-', 1)[0]
b_i = hp.gauss_beam(np.radians(fwhm_dict[bi] / 60.), lmax=self.Lmax) ** 2
b_j = hp.gauss_beam(np.radians(fwhm_dict[bj] / 60.), lmax=self.Lmax) ** 2
beam[i, j] = self.binner.bin_cell(np.sqrt(b_i * b_j))
return beam
[docs]
def _calc_covariance(self, num_sims: int = 100) -> Tuple[np.ndarray, np.ndarray, float]:
"""
Estimate and cache the data covariance under the likelihood mask.
The mode is controlled by ``self.cov_mode``:
* ``'block_diag'``: independent (n_bins × n_bins) blocks per (i,j) map
pair, each Hartlap-corrected. Only needs N_sims >> n_bins_per_pair.
* ``'shrinkage'``: full covariance regularised by OAS shrinkage toward
the scaled identity. Well-conditioned for any N_sims.
* ``'full'``: raw sample covariance + Hartlap. Raises ValueError when
N_sims ≤ n_data + 2.
Returns
-------
(cov, cov_inv, logdet_cov)
"""
fname = os.path.join(
self.libdir,
f'cov_{self.cov_mode}_{num_sims}sims'
f'_lmin{self.lat_lrange[0]}_lmax{self.lat_lrange[1]}'
f'_slmin{self.sat_lrange[0]}_slmax{self.sat_lrange[1]}'
f'_sel{self.spectra_selection}.pkl',
)
if os.path.exists(fname):
self.logger.log(f"Loading cached covariance ({self.cov_mode})...", level="info")
with open(fname, 'rb') as f:
return pl.load(f)
self.logger.log(
f"Computing covariance (mode='{self.cov_mode}') over {num_sims} sims...", level="info"
)
all_specs = self.__all_spectra__(num_sims=num_sims) # (N, n_tags, n_tags, n_bins)
all_specs_bc = all_specs / self.beam_arr[None] # beam-correct
mask = self.__likelihood_mask__
vecs = all_specs_bc[:, mask] # (N, n_data)
N, n_data = vecs.shape
self.logger.log(f"Covariance: N_sims={N}, n_data={n_data}", level="info")
if self.cov_mode == 'full':
if N <= n_data + 2:
raise ValueError(
f"'full' covariance requires N_sims > n_data+2 = {n_data + 2}, "
f"but only {N} sims available. Use cov_mode='block_diag' or 'shrinkage'."
)
S = np.cov(vecs.T)
hartlap = (N - n_data - 2) / (N - 1)
self.logger.log(f"Hartlap factor: {hartlap:.4f}", level="info")
cov = S
cov_inv = hartlap * np.linalg.inv(S)
_, logdet = np.linalg.slogdet(S)
elif self.cov_mode == 'shrinkage':
S = np.cov(vecs.T)
cov = _oas_shrinkage(S, N)
self.logger.log(f"OAS shrinkage applied (N={N}, n_data={n_data})", level="info")
cov_inv = np.linalg.inv(cov)
_, logdet = np.linalg.slogdet(cov)
elif self.cov_mode == 'block_diag':
# One Hartlap-corrected block per (i, j) map pair.
# Elements from the same pair are contiguous in the C-order
# flattening of the 3-D mask, so we can walk through vecs in order.
n_tags = mask.shape[0]
cov = np.zeros((n_data, n_data))
cov_inv = np.zeros((n_data, n_data))
logdet = 0.0
k = 0
for i in range(n_tags):
for j in range(n_tags):
n_pair = int(np.sum(mask[i, j, :]))
if n_pair == 0:
continue
pv = vecs[:, k: k + n_pair] # (N, n_pair)
S = np.cov(pv.T) if n_pair > 1 else np.array([[float(np.var(pv, ddof=1))]])
if N > n_pair + 2:
hartlap = (N - n_pair - 2) / (N - 1)
else:
hartlap = 1.0
self.logger.log(
f"Block ({i},{j}): N ({N}) ≤ n_pair+2 ({n_pair+2}), Hartlap skipped.",
level="warning",
)
cov[k: k + n_pair, k: k + n_pair] = S
cov_inv[k: k + n_pair, k: k + n_pair] = hartlap * np.linalg.inv(S)
_, ld = np.linalg.slogdet(S)
logdet += float(ld)
k += n_pair
else:
raise ValueError(f"Unknown cov_mode '{self.cov_mode}'")
with open(fname, 'wb') as f:
pl.dump((cov, cov_inv, logdet), f)
return cov, cov_inv, logdet
# -------------------------------------------------------------------------
# Shared Likelihood & MCMC
# -------------------------------------------------------------------------
[docs]
def chisq(self, theta: np.ndarray) -> float:
"""
Calculate chi-squared statistic for given parameters.
When use_full_cov=True uses the full inverse covariance matrix;
otherwise uses diagonal (per-bin) variances.
"""
model = self.theory(theta)
data = self.mean_spec / self.beam_arr
if self.use_full_cov:
delta = (data - model)[self.__likelihood_mask__]
return float(delta @ self.cov_inv @ delta)
else:
err = self.std_spec / self.beam_arr
err = np.where(err == 0, np.inf, err)
return np.sum(((data - model) / err) ** 2 * self.__likelihood_mask__)
[docs]
def ln_likelihood(self, theta: np.ndarray) -> float:
"""Calculate log likelihood for given parameters."""
chisq_val = self.chisq(theta)
if self.use_full_cov:
n_data = int(np.sum(self.__likelihood_mask__))
return -0.5 * (chisq_val + self.logdet_cov + n_data * np.log(2 * np.pi))
return -0.5 * chisq_val
[docs]
def ln_prob(self, theta: np.ndarray) -> float:
"""Calculate log posterior probability for given parameters."""
lp = self.lnprior(theta)
return -np.inf if not np.isfinite(lp) else lp + self.ln_likelihood(theta)
[docs]
def run_mcmc(self, nwalkers: int = 32, nsamples: int = 2000, rerun: bool = False,
fiducial_params: Optional[np.ndarray] = None, fname: Optional[str] = None,
burnin: int = 200, thin: int = 20) -> np.ndarray:
"""
Run (or extend/resume) MCMC using an emcee HDF5 backend.
The backend file grows as you call run_mcmc with larger nsamples —
no samples are ever discarded from disk. burnin and thin only affect
what is *returned*; you can change them freely without re-running.
Args:
nwalkers: Number of walkers.
nsamples: Target total number of steps stored in the backend.
If the backend already has >= nsamples steps, the chain
is returned immediately. If it has fewer, only the
remaining steps are run (extending the existing chain).
rerun: If True, wipe the backend and start from scratch.
fiducial_params: Starting point for walkers (required on first run;
ignored when resuming an existing chain).
fname: Override the auto-generated backend path (.h5 file).
burnin: Steps to discard from the front when returning the chain.
thin: Thinning factor applied when returning the chain.
"""
ndim = len(self.__pnames__)
if fname is None:
cov_tag = f"_{self.cov_mode}" if self.use_full_cov else ""
fname = os.path.join(
self.libdir,
f"chain_{nwalkers}w_fitper{self.fit_per_split}_ndim{ndim}"
f"_lmin{self.lat_lrange[0]}_lmax{self.lat_lrange[1]}"
f"_slmin{self.sat_lrange[0]}_slmax{self.sat_lrange[1]}{cov_tag}.h5",
)
backend = emcee.backends.HDFBackend(fname)
if rerun:
backend.reset(nwalkers, ndim)
already_run = backend.initialized and backend.iteration > 0
if already_run and backend.iteration >= nsamples:
self.logger.log(
f"Returning cached chain ({backend.iteration} steps, burnin={burnin}, thin={thin})",
level="info",
)
return backend.get_chain(discard=burnin, thin=thin, flat=True)
if already_run:
initial_state = None
remaining = nsamples - backend.iteration
self.logger.log(
f"Extending chain: {backend.iteration} → {nsamples} steps", level="info"
)
else:
if fiducial_params is not None:
if len(fiducial_params) != ndim:
raise ValueError(
f"fiducial_params length {len(fiducial_params)} does not match ndim {ndim}"
)
else:
fiducial_params = np.zeros(ndim)
initial_state = fiducial_params + 1e-3 * np.random.randn(nwalkers, ndim)
remaining = nsamples
sampler = emcee.EnsembleSampler(nwalkers, ndim, self.ln_prob, backend=backend)
sampler.run_mcmc(initial_state, remaining, progress=True)
return sampler.get_chain(discard=burnin, thin=thin, flat=True)
[docs]
def getdist_samples(self, nwalkers: int, nsamples: int, rerun: bool = False,
fiducial_params: Optional[np.ndarray] = None,
label: Optional[str] = None,
burnin: int = 200, thin: int = 20) -> MCSamples:
samples = self.run_mcmc(nwalkers, nsamples, rerun=rerun,
fiducial_params=fiducial_params,
burnin=burnin, thin=thin)
return MCSamples(samples=samples, names=self.__pnames__, labels=self.__plabels__, label=label)
# -------------------------------------------------------------------------
# Shared Plotting
# -------------------------------------------------------------------------
[docs]
def plot_posteriors(self, nwalkers: int, nsamples: int, rerun: bool = False, label: Optional[str] = None,
fiducial_params: Optional[np.ndarray] = None, burnin: int = 200, thin: int = 20):
samples = self.getdist_samples(nwalkers, nsamples, rerun=rerun, label=label,
fiducial_params=fiducial_params, burnin=burnin, thin=thin)
g = plots.get_subplot_plotter()
g.triangle_plot([samples], filled=True, title_limit=1)
return g
[docs]
def plot_spectra_matrix(self, theta: Optional[np.ndarray] = None, save_path: Optional[str] = None,
average_split: bool = False) -> None:
"""
Plots the mean data spectra with std deviation and optionally a theory curve
in a matrix layout corresponding to all cross-correlations.
Args:
theta: Parameter vector (alphas, beta, A_EB, etc.) for theory overlay.
save_path: If given, saves the figure instead of showing.
average_split: If True, average spectra across splits per frequency.
"""
import matplotlib.pyplot as plt
ells = self.binner.get_effective_ells()
# Build ell masks based on sat_lrange and lat_lrange
n_bins = len(ells)
# SAT ell mask
sat_ell_mask = np.ones(n_bins, dtype=bool)
if self.sat_lrange[0] is not None:
sat_ell_mask &= (ells >= self.sat_lrange[0])
if self.sat_lrange[1] is not None:
sat_ell_mask &= (ells <= self.sat_lrange[1])
# LAT ell mask
lat_ell_mask = np.ones(n_bins, dtype=bool)
if self.lat_lrange[0] is not None:
lat_ell_mask &= (ells >= self.lat_lrange[0])
if self.lat_lrange[1] is not None:
lat_ell_mask &= (ells <= self.lat_lrange[1])
# Cross-correlation mask (intersection)
cross_ell_mask = sat_ell_mask & lat_ell_mask
# ----------------------------------------------------------
# Optionally average over splits for display only
# ----------------------------------------------------------
if average_split:
freq_bases = list(self.freq_groups.keys())
n_freqs = len(freq_bases)
n_bins = self.binner.get_n_bands()
data_spec = self.mean_spec / self.beam_arr
error_spec = self.std_spec / self.beam_arr
data_avg = np.zeros((n_freqs, n_freqs, n_bins))
err_avg = np.zeros_like(data_avg)
for i, base_i in enumerate(freq_bases):
idx_i = self.freq_groups[base_i]
for j, base_j in enumerate(freq_bases):
idx_j = self.freq_groups[base_j]
vals = [data_spec[ii, jj, :] for ii in idx_i for jj in idx_j]
errs = [error_spec[ii, jj, :] for ii in idx_i for jj in idx_j]
data_avg[i, j, :] = np.nanmean(vals, axis=0)
err_avg[i, j, :] = np.nanmean(errs, axis=0)
maptags = freq_bases
data_spec, error_spec = data_avg, err_avg
if theta is not None:
theory_spec = self.theory(theta)
th_avg = np.zeros_like(data_avg)
for i, base_i in enumerate(freq_bases):
idx_i = self.freq_groups[base_i]
for j, base_j in enumerate(freq_bases):
idx_j = self.freq_groups[base_j]
vals = [theory_spec[ii, jj, :] for ii in idx_i for jj in idx_j]
th_avg[i, j, :] = np.nanmean(vals, axis=0)
theory_spec = th_avg
else:
theory_spec = None
else:
maptags = self.maptags
data_spec = self.mean_spec / self.beam_arr
error_spec = self.std_spec / self.beam_arr
theory_spec = self.theory(theta) if theta is not None else None
# ----------------------------------------------------------
# Set up figure
# ----------------------------------------------------------
n_tags = len(maptags)
fig, axes = plt.subplots(
n_tags, n_tags,
figsize=(n_tags * 3.0, n_tags * 3.0),
sharex=True, sharey='row'
)
# ----------------------------------------------------------
# Main plotting loop
# ----------------------------------------------------------
for i in range(n_tags):
for j in range(n_tags):
ax = axes[i, j]
is_diagonal = (i == j)
# Respect spectra_selection
plot_this = False
if self.spectra_selection == 'all':
plot_this = True
elif self.spectra_selection == 'auto_only' and is_diagonal:
plot_this = True
elif self.spectra_selection == 'cross_only' and not is_diagonal:
plot_this = True
if not plot_this:
ax.set_visible(False)
continue
# Determine which ell mask to use for this subplot
is_i_sat = maptags[i].startswith('SAT')
is_j_sat = maptags[j].startswith('SAT')
if is_i_sat and is_j_sat:
# SAT-SAT correlation: use SAT range
subplot_mask = sat_ell_mask
elif not is_i_sat and not is_j_sat:
# LAT-LAT correlation: use LAT range
subplot_mask = lat_ell_mask
else:
# SAT-LAT cross-correlation: use intersection
subplot_mask = cross_ell_mask
# Filter data for this subplot
ells_plot = ells[subplot_mask]
data_plot = data_spec[i, j, subplot_mask]
error_plot = error_spec[i, j, subplot_mask]
# Plot data ±1σ
ax.errorbar(
ells_plot, data_plot, yerr=error_plot,
fmt='.', capsize=2, color='black', markersize=4, alpha=0.5,
)
# Overlay theory if provided (plot over full range)
if theory_spec is not None:
ax.loglog(ells, theory_spec[i, j], color='red')
# Shade region used in likelihood
if average_split:
mask_1d = self.__likelihood_mask__[0, 0, :][subplot_mask]
else:
mask_1d = self.__likelihood_mask__[i, j, :][subplot_mask]
used_ells = ells_plot[mask_1d]
# if len(used_ells) > 0:
# ax.axvspan(used_ells.min(), used_ells.max(), color='green', alpha=0.1, zorder=-1)
# Formatting
ax.set_xscale('log')
ax.set_yscale('log')
ax.axhline(0, color='grey', linestyle=':', linewidth=0.75)
ax.grid(True, linestyle='--', alpha=0.6)
ax.set_ylim(1e-7, 1e-4)
ax.set_xlim(20, 3000)
# Axes labels
if i == n_tags - 1:
ax.set_xlabel(r'$\ell$')
if j == 0:
ax.set_ylabel(r'$C_\ell^{EB}$')
if i == 0:
ax.set_title(f"$\\rm {maptags[j].replace('_', '-')}$", fontsize=10)
# Right-side frequency labels
for i, tag in enumerate(maptags):
axes[i, n_tags - 1].text(
1.05, 0.5, f"$\\rm {tag.replace('_', '-')}$",
transform=axes[i, n_tags - 1].transAxes,
va='center', ha='left', fontsize=10
)
# Legend
handles, labels = [], []
for ax in axes.flatten():
if ax.get_visible():
h, l = ax.get_legend_handles_labels()
handles.extend(h)
labels.extend(l)
break
if handles:
fig.legend(handles, labels, loc='upper right')
plt.tight_layout(rect=(0.03, 0.03, 0.92, 0.95))
# Save or show
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
self.logger.log(f"Figure saved to {save_path}", level="info")
plt.close()
else:
#plt.show()
return None
[docs]
def mle(self,nwalkers,nsamples,op=np.median):
s = self.run_mcmc(nwalkers=nwalkers, nsamples=nsamples)
theta = op(s,axis=0)
chi2_min = self.chisq(theta)
return chi2_min
# ============================================================
# =============== SUBCLASS 1: BETA FIT =======================
# ============================================================
[docs]
class Sat4LatCross(BaseSat4LatCross):
"""
Calibration model fitting for birefringence angle β.
This class implements a calibration analysis that fits calibration angles (alphas)
and the cosmic birefringence angle β using EB cross-spectra between LAT and SAT.
Attributes:
cl_len: Lensed CMB power spectra dictionary
"""
[docs]
def __init__(self,
spec_cross: 'SpectraCross',
sat_err: float,
beta_fid: float,
sat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
lat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
fit_per_split: bool = True,
spectra_selection: str = 'all',
num_sims: int = 100,
verbose: bool = False,
cov_mode: str = 'diagonal') -> None:
"""
Initialize calibration analysis for birefringence angle fitting.
Args:
spec_cross: Cross-spectrum calculation object
sat_err: Satellite calibration error
beta_fid: Fiducial birefringence angle for theory calculation
sat_lrange: Satellite ell range for fitting (lmin, lmax)
lat_lrange: LAT ell range for fitting (lmin, lmax)
fit_per_split: Whether to fit per split or per frequency
spectra_selection: Which spectra to include ('all', 'auto_only', 'cross_only')
num_sims: Number of simulations for mean/std calculation
verbose: Whether to enable verbose output
cov_mode: Covariance mode ('diagonal', 'block_diag', 'shrinkage', 'full')
"""
super().__init__(spec_cross, sat_err, sat_lrange, lat_lrange,
fit_per_split, spectra_selection, 'CalibrationAnalysis', num_sims, verbose,
cov_mode=cov_mode)
self.cl_len = CMB(spec_cross.libdir, spec_cross.nside, beta=beta_fid).get_lensed_spectra(dl=False)
if fit_per_split:
self.__pnames__ = [f"a_{t}" for t in self.maptags] + ['beta']
self.__plabels__ = [_format_alpha_label(t) for t in self.maptags] + [r"\beta"]
else:
self.__pnames__ = [f"a_{f}" for f in self.freq_bases] + ['beta']
self.__plabels__ = [_format_alpha_label(f) for f in self.freq_bases] + [r"\beta"]
[docs]
def theory(self, theta: np.ndarray) -> np.ndarray:
"""
Compute theoretical EB spectra between all map pairs
using the exact birefringence + miscalibration formula:
C_ell^{E_iB_j} = cos(2α_i+2β)sin(2α_j+2β)C_EE
- sin(2α_i+2β)cos(2α_j+2β)C_BB
Optionally adds a dust foreground power law: A_dust * (ell/80)^alpha_dust
"""
# Unpack angles
# α_i = per-map miscalibration
# β = global cosmic birefringence
if self.fit_per_split:
alphas = theta[:-1]
else:
base_a = {b: theta[i] for i, b in enumerate(self.freq_bases)}
alphas = np.array([base_a[t.rsplit('-', 1)[0]] for t in self.maptags])
beta = theta[-1]
# Get Cℓ^EE and Cℓ^BB
cl_ee = self.cl_len["ee"][:self.Lmax+1]
cl_bb = self.cl_len["bb"][:self.Lmax+1]
model = np.zeros_like(self.mean_spec)
for i in range(len(self.maptags)): # E_i
for j in range(len(self.maptags)): # B_j
# angles in radians
ai = np.deg2rad(alphas[i])
aj = np.deg2rad(alphas[j])
b = np.deg2rad(beta)
term = (
np.cos(2*ai + 2*b) * np.sin(2*aj + 2*b) * cl_ee
- np.sin(2*ai + 2*b) * np.cos(2*aj + 2*b) * cl_bb
)
model[i, j] = self.binner.bin_cell(term)
return model
[docs]
def lnprior(self, theta: np.ndarray) -> float:
alphas, beta = theta[:-1], theta[-1]
if np.any(np.abs(alphas) > 0.5) or not (-0.5 < beta < 0.5):
return -np.inf
sat_idx = [i for i, t in enumerate(self.maptags if self.fit_per_split else self.freq_bases) if t.startswith('SAT')]
return float(-0.5 * np.sum(np.array(alphas)[sat_idx] ** 2 / self.sat_err**2 - np.log(self.sat_err * np.sqrt(2 * np.pi))))
@property
def dof(self) -> int:
"""
Calculate degrees of freedom for chi-squared analysis.
Returns:
Number of data points minus number of fitted parameters
"""
n_data_points = np.sum(self.__likelihood_mask__)
n_parameters = len(self.__pnames__)
return n_data_points - n_parameters
# ============================================================
# =============== SUBCLASS 2: AMPLITUDE FIT ==================
# ============================================================
# ============================================================
# ============ SUBCLASS 4: FG AMPLITUDE + BETA FIT ===========
# ============================================================
[docs]
class Sat4LatCross_FGFit(BaseSat4LatCross):
"""
Calibration model fitting for birefringence angle β with shared per-frequency
foreground amplitude parameters.
Fits calibration angles (alphas), the cosmic birefringence angle β, and
per-frequency foreground scaling amplitudes (A_fg) using EB cross-spectra.
Here frequency means only the band value (e.g. ``93``, ``145``), shared
across LAT and SAT.
Foreground EB templates are computed as full-sky power spectra from the
saved dust Q/U maps and then binned with the same binner.
Before initialisation the class verifies that LAT and SAT were simulated
with the same dust model; a ``ValueError`` is raised otherwise.
Attributes
----------
cl_len : dict
Lensed CMB power spectra dictionary.
freq_values : List[str]
Unique frequency values extracted from map tags (e.g. ``['93', '145']``).
fg_templates_binned : np.ndarray
Binned foreground EB templates, shape ``(n_freq_values, n_freq_values, n_bins)``.
"""
[docs]
def __init__(self,
spec_cross: 'SpectraCross',
sat_err: float,
beta_fid: float,
sat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
lat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
fit_per_split: bool = True,
spectra_selection: str = 'all',
num_sims: int = 100,
verbose: bool = False,
cov_mode: str = 'diagonal',
fg_model: str = 'geometric',
A_fg_max: float = 2.0) -> None:
"""
Initialise calibration analysis with foreground amplitude fitting.
Args:
spec_cross: Cross-spectrum calculation object.
sat_err: Satellite calibration angle prior width (degrees).
beta_fid: Fiducial birefringence angle used to build the CMB template.
sat_lrange: Satellite ell range for fitting ``(lmin, lmax)``.
lat_lrange: LAT ell range for fitting ``(lmin, lmax)``.
fit_per_split: Fit one alpha per split-map (True) or per frequency (False).
spectra_selection: Which spectra to include: ``'all'``, ``'auto_only'``,
or ``'cross_only'``.
num_sims: Number of simulations used for mean/std estimation.
verbose: Enable verbose logging.
cov_mode: Covariance mode ('diagonal', 'block_diag', 'shrinkage', 'full').
fg_model: Foreground amplitude parametrisation.
``'geometric'``: one amplitude per frequency; cross-frequency
scaling via geometric mean ``sqrt(A_i * A_j)``.
``'per_pair'``: one independent amplitude per unique (freq_i, freq_j)
pair — more robust, no factorizability assumption.
A_fg_max: Hard upper bound (in absolute value) for all A_fg parameters.
Flat prior on ``(-A_fg_max, A_fg_max)``.
Raises
------
ValueError
If LAT and SAT use different dust foreground models.
"""
if fg_model not in ('geometric', 'per_pair'):
raise ValueError(f"fg_model must be 'geometric' or 'per_pair', got '{fg_model}'")
# Foreground consistency must be checked before super().__init__ loads data.
self._check_fg_consistency(spec_cross)
super().__init__(spec_cross, sat_err, sat_lrange, lat_lrange,
fit_per_split, spectra_selection,
'CalibrationAnalysis_FGFit', num_sims, verbose,
cov_mode=cov_mode)
self.fg_model = fg_model
self.A_fg_max = A_fg_max
self.cl_len = CMB(spec_cross.libdir, spec_cross.nside,
beta=beta_fid).get_lensed_spectra(dl=False)
# Frequency values shared between LAT and SAT (e.g. '93', '145').
self.freq_values: List[str] = sorted({b.split('_', 1)[1] for b in self.freq_bases})
# Compute (or load) binned foreground templates.
self.fg_templates_binned: np.ndarray = self._get_fg_templates()
# Unique ordered pairs for per_pair model.
freq_idx = {nu: k for k, nu in enumerate(self.freq_values)}
self._fg_pairs: List[Tuple[str, str]] = [
(fi, fj)
for i, fi in enumerate(self.freq_values)
for j, fj in enumerate(self.freq_values)
if j >= i
]
self._freq_idx = freq_idx
# Build parameter names/labels: alphas + beta + A_fg parameters.
if fg_model == 'geometric':
fg_pnames = [f"A_fg_{nu}" for nu in self.freq_values]
fg_plabels = [rf"A_{{fg,{nu}}}" for nu in self.freq_values]
else: # per_pair
fg_pnames = [f"A_fg_{fi}_{fj}" for fi, fj in self._fg_pairs]
fg_plabels = [rf"A_{{fg,{fi}\times{fj}}}" for fi, fj in self._fg_pairs]
if fit_per_split:
self.__pnames__ = [f"a_{t}" for t in self.maptags] + ['beta'] + fg_pnames
self.__plabels__ = ([_format_alpha_label(t) for t in self.maptags]
+ [r"\beta"] + fg_plabels)
else:
self.__pnames__ = [f"a_{f}" for f in self.freq_bases] + ['beta'] + fg_pnames
self.__plabels__ = ([_format_alpha_label(f) for f in self.freq_bases]
+ [r"\beta"] + fg_plabels)
# -------------------------------------------------------------------------
# Foreground helpers
# -------------------------------------------------------------------------
@staticmethod
def _check_fg_consistency(spec_cross: 'SpectraCross') -> None:
"""Raise ValueError if LAT and SAT use different dust foreground models."""
if spec_cross.lat_only:
return
if spec_cross.sat is None:
raise ValueError("Foreground fitting requires SAT maps when lat_only is False.")
lat_dust = spec_cross.lat.dust_model
sat_dust = spec_cross.sat.dust_model
if lat_dust != sat_dust:
raise ValueError(
f"LAT and SAT must use the same dust model for foreground fitting. "
f"Got LAT dust_model={lat_dust}, SAT dust_model={sat_dust}."
)
def _get_fg_templates(self) -> np.ndarray:
"""
Compute (or load from cache) full-sky foreground EB template spectra.
For each pair ``(freq_i, freq_j)`` the method computes
``C_ell^{E_i B_j}`` full-sky from dust Q/U maps and
bins the result with ``self.binner``.
Returns
-------
np.ndarray
Binned templates with shape ``(n_freq_values, n_freq_values, n_bins)``.
"""
fname = os.path.join(self.libdir, 'fg_templates_binned_shared_freq_dustonly.pkl')
if os.path.exists(fname):
self.logger.log("Loading cached foreground EB templates...", level="info")
with open(fname, 'rb') as f:
return pl.load(f)
self.logger.log("Computing full-sky dust-only foreground EB templates...", level="info")
n_freqs = len(self.freq_values)
# Foreground is shared between LAT/SAT for the same frequency, so build
# one QU map per frequency using LAT foreground maps.
fg = self.spec_cross.lat.foreground
fg_qu: Dict[str, np.ndarray] = {}
for freq in self.freq_values:
dust_qu = fg.dustQU(str(freq)) # shape (2, npix): [Q, U]
fg_qu[freq] = dust_qu
# Precompute E/B alms once per frequency map, then use alm2cl for
# all auto/cross combinations. This is significantly faster than
# repeated map->spectrum transforms inside the pair loop.
npix = fg_qu[self.freq_values[0]].shape[-1]
nside = hp.npix2nside(npix)
lmax_hp = min(self.Lmax, 3 * nside - 1)
ealms: Dict[str, np.ndarray] = {}
balms: Dict[str, np.ndarray] = {}
for freq in tqdm(self.freq_values, desc='FG map2alm (shared freq)', leave=False):
qmap = fg_qu[freq][0]
umap = fg_qu[freq][1]
_, ealm, balm = hp.map2alm([np.zeros(npix), qmap, umap], lmax=lmax_hp, pol=True)
ealms[freq] = ealm
balms[freq] = balm
unbinned = np.zeros((n_freqs, n_freqs, self.Lmax + 1))
for i, fi in tqdm(enumerate(self.freq_values), total=n_freqs, desc='FG EB auto/cross', leave=False):
for j, fj in enumerate(self.freq_values):
cl_eb = hp.alm2cl(ealms[fi], balms[fj], lmax=lmax_hp)
ell_len = min(cl_eb.shape[-1], self.Lmax + 1)
unbinned[i, j, :ell_len] = cl_eb[:ell_len]
# Bin and cache.
n_bins = self.binner.get_n_bands()
binned = np.zeros((n_freqs, n_freqs, n_bins))
for i in range(n_freqs):
for j in range(n_freqs):
binned[i, j] = self.binner.bin_cell(unbinned[i, j])
with open(fname, 'wb') as f:
pl.dump(binned, f)
self.logger.log("Foreground EB templates computed and saved.", level="info")
return binned
# -------------------------------------------------------------------------
# Abstract method implementations
# -------------------------------------------------------------------------
[docs]
def theory(self, theta: np.ndarray) -> np.ndarray:
"""
Theoretical EB model including miscalibration, birefringence, and foregrounds.
.. math::
C_\\ell^{E_i B_j} = \\cos(2\\alpha_i+2\\beta)\\sin(2\\alpha_j+2\\beta)\\,C_{EE}
- \\sin(2\\alpha_i+2\\beta)\\cos(2\\alpha_j+2\\beta)\\,C_{BB}
+ \\sqrt{A_{fg,i}A_{fg,j}}\\,C_\\ell^{fg,\\,E_i B_j}
Parameters are ordered as: ``[alphas..., beta, A_fg_per_frequency...]``.
""" # --- Unpack parameters ---
if self.fit_per_split:
n_alpha = len(self.maptags)
else:
n_alpha = len(self.freq_bases)
alphas_raw = theta[:n_alpha]
beta = theta[n_alpha]
A_fg_arr = theta[n_alpha + 1:]
n_fg_expected = len(self.freq_values) if self.fg_model == 'geometric' else len(self._fg_pairs)
if len(A_fg_arr) != n_fg_expected:
raise ValueError(
f"Expected {n_fg_expected} foreground amplitudes for fg_model='{self.fg_model}', "
f"got {len(A_fg_arr)}."
)
# For fit_per_split=False, expand alphas to per-maptag.
if self.fit_per_split:
alphas = alphas_raw
else:
base_a = {b: alphas_raw[k] for k, b in enumerate(self.freq_bases)}
alphas = np.array([base_a[t.rsplit('-', 1)[0]] for t in self.maptags])
freq_to_idx = self._freq_idx
cl_ee = self.cl_len["ee"][:self.Lmax + 1]
cl_bb = self.cl_len["bb"][:self.Lmax + 1]
# Build foreground amplitude lookup depending on model.
if self.fg_model == 'geometric':
freq_to_Afg = {nu: A_fg_arr[k] for k, nu in enumerate(self.freq_values)}
else: # per_pair
pair_to_Afg = {pair: A_fg_arr[k] for k, pair in enumerate(self._fg_pairs)}
model = np.zeros_like(self.mean_spec)
for i in range(len(self.maptags)):
for j in range(len(self.maptags)):
ai = np.deg2rad(alphas[i])
aj = np.deg2rad(alphas[j])
b = np.deg2rad(beta)
rot_term = (
np.cos(2*ai + 2*b) * np.sin(2*aj + 2*b) * cl_ee
- np.sin(2*ai + 2*b) * np.cos(2*aj + 2*b) * cl_bb
)
fi = self.maptags[i].rsplit('-', 1)[0].split('_', 1)[1]
fj = self.maptags[j].rsplit('-', 1)[0].split('_', 1)[1]
ki = freq_to_idx[fi]
kj = freq_to_idx[fj]
if self.fg_model == 'geometric':
A_i, A_j = freq_to_Afg[fi], freq_to_Afg[fj]
fg_contrib = np.sqrt(max(A_i * A_j, 0.0)) * self.fg_templates_binned[ki, kj]
else: # per_pair — symmetric lookup
key = (fi, fj) if freq_to_idx[fi] <= freq_to_idx[fj] else (fj, fi)
fg_contrib = pair_to_Afg[key] * self.fg_templates_binned[ki, kj]
model[i, j] = self.binner.bin_cell(rot_term) + fg_contrib
return model
[docs]
def lnprior(self, theta: np.ndarray) -> float:
"""
Log prior probability.
- Gaussian prior on SAT alpha angles (width = ``sat_err``).
- Flat prior on beta in ``(-0.5, 0.5)`` degrees.
- Flat prior on each A_fg in ``(-A_fg_max, A_fg_max)``.
For ``fg_model='geometric'`` A_fg is clipped to ``[0, A_fg_max]``
because the geometric mean requires non-negative values.
- Hard bounds ``|alpha| < 0.5`` degrees for all angles.
"""
if self.fit_per_split:
n_alpha = len(self.maptags)
else:
n_alpha = len(self.freq_bases)
alphas = theta[:n_alpha]
beta = theta[n_alpha]
A_fg_arr = theta[n_alpha + 1:]
n_fg_expected = len(self.freq_values) if self.fg_model == 'geometric' else len(self._fg_pairs)
if len(A_fg_arr) != n_fg_expected:
return -np.inf
if np.any(np.abs(alphas) > 0.5):
return -np.inf
if not (-0.5 < beta < 0.5):
return -np.inf
if self.fg_model == 'geometric':
# geometric mean requires non-negative amplitudes
if np.any(A_fg_arr < 0.0) or np.any(A_fg_arr > self.A_fg_max):
return -np.inf
else: # per_pair allows negative (EB foreground can have either sign)
if np.any(np.abs(A_fg_arr) > self.A_fg_max):
return -np.inf
tags = self.maptags if self.fit_per_split else self.freq_bases
sat_idx = [k for k, t in enumerate(tags) if t.startswith('SAT')]
return float(
-0.5 * np.sum(
np.array(alphas)[sat_idx] ** 2 / self.sat_err ** 2
- np.log(self.sat_err * np.sqrt(2 * np.pi))
)
)
@property
def dof(self) -> int:
"""Degrees of freedom: data points minus number of fitted parameters."""
return int(np.sum(self.__likelihood_mask__)) - len(self.__pnames__)
[docs]
class Sat4LatCross_AmplitudeFit(BaseSat4LatCross):
"""
Calibration model fitting for amplitude parameter A_EB.
This class implements a calibration analysis that fits calibration angles (alphas)
and an amplitude parameter A_EB using EB cross-spectra between LAT and SAT.
Attributes:
eb_template_unbinned: Unbinned EB template spectrum
binned_template: Binned EB template spectrum
cl_len: Lensed CMB power spectra dictionary
cl_diff_unbinned: Unbinned difference spectrum (EE - BB)/2
"""
[docs]
def __init__(self,
spec_cross: 'SpectraCross',
sat_err: float,
temp_model: str,
temp_value: float,
sat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
lat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
fit_per_split: bool = True,
spectra_selection: str = 'all',
sim_idx: int|None = None,
alpha_lat_prior: str = 'gaussian',
fix_alpha: bool = False,
num_sims: int = 100,
verbose: bool = False,
cov_mode: str = 'diagonal') -> None:
"""
Initialize calibration analysis for amplitude parameter fitting.
Args:
spec_cross: Cross-spectrum calculation object
sat_err: Satellite calibration error
temp_model: Template model ('iso' or 'iso_td')
temp_value: Template parameter value (beta for 'iso', mass for 'iso_td')
sat_lrange: Satellite ell range for fitting (lmin, lmax)
lat_lrange: LAT ell range for fitting (lmin, lmax)
fit_per_split: Whether to fit per split or per frequency
spectra_selection: Which spectra to include ('all', 'auto_only', 'cross_only')
sim_idx: Simulation index to use for data (default: 0)
alpha_lat_prior: Prior type for LAT alpha parameters ('gaussian' or 'flat')
fix_alpha: Whether to fix alpha parameters (calibration angles) to zero
num_sims: Number of simulations for mean/std calculation
verbose: Whether to enable verbose output
cov_mode: Covariance mode ('diagonal', 'block_diag', 'shrinkage', 'full')
"""
self.sim_idx = sim_idx
self.fix_alpha = fix_alpha
suffix = f'CalibrationAnalysis_AmpFit_{temp_model}_{temp_value}_sim{sim_idx}'
super().__init__(spec_cross, sat_err, sat_lrange, lat_lrange, fit_per_split, spectra_selection, suffix, num_sims, verbose,
cov_mode=cov_mode)
if temp_model == 'iso':
cmb = CMB(spec_cross.libdir, spec_cross.nside, beta=temp_value, verbose=verbose)
self.eb_template_unbinned = cmb.get_cb_lensed_spectra(dl=False)['eb']
elif temp_model == 'iso_td':
cmb = CMB(spec_cross.libdir, spec_cross.nside, model=temp_model, mass=temp_value, verbose=verbose)
self.eb_template_unbinned = cmb.get_cb_lensed_mass_spectra(dl=False)['eb']
else:
raise ValueError("temp_model must be 'iso' or 'iso_td'")
self.binned_template = self.binner.bin_cell(self.eb_template_unbinned[:self.Lmax+1])
self.cl_len = CMB(spec_cross.libdir, spec_cross.nside, beta=0.0).get_lensed_spectra(dl=False)
self.cl_diff_unbinned = 0.5 * (self.cl_len["ee"][:self.Lmax + 1] -
self.cl_len["bb"][:self.Lmax + 1])
if fix_alpha:
self.__pnames__ = ['A_EB']
self.__plabels__ = [r"A_{EB}"]
else:
if fit_per_split:
self.__pnames__ = [f"a_{t}" for t in self.maptags] + ['A_EB']
self.__plabels__ = [_format_alpha_label(t) for t in self.maptags] + [r"A_{EB}"]
else:
self.__pnames__ = [f"a_{f}" for f in self.freq_bases] + ['A_EB']
self.__plabels__ = [_format_alpha_label(f) for f in self.freq_bases] + [r"A_{EB}"]
self.alpha_lat_prior = alpha_lat_prior
assert self.alpha_lat_prior in ['gaussian','flat'], "alpha_lat_prior must be 'gaussian' or 'flat'"
if fit_per_split:
self.lat_alpha_mean = np.array([0.2]*4)
self.sat_alpha_mean = np.array([0.0]*4)
else:
self.lat_alpha_mean = np.array([0.2]*2)
self.sat_alpha_mean = np.array([0.0]*2)
self.lat_alpha_err = 0.01
self.sat_alpha_err = sat_err
def _calc_mean_std(self, num_sims: int = 100) -> Tuple[np.ndarray, np.ndarray]:
"""
Override parent method to use a specific simulation realization instead of mean.
Args:
num_sims: Number of simulations (used for std calculation only)
Returns:
Tuple of (single_realization_spectrum, std_spectrum) arrays
"""
if self.sim_idx is None:
fname = os.path.join(self.libdir, f'realization_spec_sim_mean_{num_sims}_sims.pkl')
else:
fname = os.path.join(self.libdir, f'realization_spec_sim{self.sim_idx}_{num_sims}_sims.pkl')
if os.path.exists(fname):
self.logger.log(f"Loading cached realization spectrum for sim_idx={self.sim_idx}...", level="info")
with open(fname, 'rb') as f:
return pl.load(f)
self.logger.log(f"Computing realization spectrum for sim_idx={self.sim_idx} and std over {num_sims} simulations...", level="info")
# Get the specific realization
# Compute std from multiple simulations for error bars
all_specs = [
self.spec_cross.data_matrix(i, which='EB',
sat_lrange=self.sat_lrange,
lat_lrange=self.lat_lrange,
avg_splits=False)
for i in tqdm(range(num_sims))
]
all_specs = np.array(all_specs)
std_spec = np.std(all_specs, axis=0)
if self.sim_idx is None:
single_spec = np.mean(all_specs, axis=0)
else:
single_spec = all_specs[self.sim_idx]
with open(fname, 'wb') as f:
pl.dump((single_spec, std_spec), f)
return single_spec, std_spec
def _get_alphas(self, theta: np.ndarray) -> np.ndarray:
"""
Extract alpha parameters from parameter vector.
Args:
theta: Full parameter vector
Returns:
Array of alpha parameters for each map tag
"""
if self.fix_alpha:
# Use mean values for fixed alphas
if self.fit_per_split:
# Combine LAT and SAT mean values in the correct order
alphas = np.zeros(len(self.maptags))
lat_count, sat_count = 0, 0
for i, tag in enumerate(self.maptags):
if tag.startswith('LAT'):
alphas[i] = self.lat_alpha_mean[lat_count]
lat_count += 1
else: # SAT
alphas[i] = self.sat_alpha_mean[sat_count]
sat_count += 1
return alphas
else:
# For frequency-based fitting
base_alphas = {}
lat_count, sat_count = 0, 0
for base in self.freq_bases:
if base.startswith('LAT'):
base_alphas[base] = self.lat_alpha_mean[lat_count]
lat_count += 1
else: # SAT
base_alphas[base] = self.sat_alpha_mean[sat_count]
sat_count += 1
return np.array([base_alphas[t.rsplit('-', 1)[0]] for t in self.maptags])
elif self.fit_per_split:
return theta[:-1]
else:
base_a = {b: theta[i] for i, b in enumerate(self.freq_bases)}
return np.array([base_a[t.rsplit('-', 1)[0]] for t in self.maptags])
[docs]
def theory(self, theta: np.ndarray) -> np.ndarray:
alphas = self._get_alphas(theta)
A_EB = theta[-1] if not self.fix_alpha else theta[0]
cl_ee = self.cl_len["ee"][:self.Lmax+1]
cl_bb = self.cl_len["bb"][:self.Lmax+1]
model = np.zeros_like(self.mean_spec)
for i in range(len(self.maptags)):
for j in range(len(self.maptags)):
ai = np.deg2rad(alphas[i])
aj = np.deg2rad(alphas[j])
# physical rotation part (unbinned)
term_unbinned = (
np.cos(2*ai) * np.sin(2*aj) * cl_ee
- np.sin(2*ai) * np.cos(2*aj) * cl_bb
)
# bin + add scaled template
model[i, j] = (
self.binner.bin_cell(term_unbinned)
+ (np.cos(2*ai + 2*aj) * self.binned_template / A_EB)
)
return model
[docs]
def lnprior(self, theta: np.ndarray) -> float:
if self.fix_alpha:
A_EB = theta[0]
if not (0 < A_EB < 2):
return -np.inf
return 0.0 # No prior on alphas since they're fixed to zero
alphas, A_EB = theta[:-1], theta[-1]
if np.any(np.abs(alphas) > 0.5) or not (-1 < A_EB < 3):
return -np.inf
# SAT prior
sat_idx = [i for i, t in enumerate(self.maptags if self.fit_per_split else self.freq_bases)
if t.startswith('SAT')]
sat_alpha_means = self.sat_alpha_mean if self.fit_per_split else self.sat_alpha_mean[:len([b for b in self.freq_bases if b.startswith('SAT')])]
sat_prior = -0.5 * np.sum((np.array(alphas)[sat_idx] - sat_alpha_means)** 2 / self.sat_alpha_err**2 -
np.log(self.sat_alpha_err * np.sqrt(2 * np.pi)))
if self.alpha_lat_prior == 'flat':
return float(sat_prior)
else:
lat_idx = [i for i, t in enumerate(self.maptags if self.fit_per_split else self.freq_bases)
if t.startswith('LAT')]
lat_alpha_means = self.lat_alpha_mean if self.fit_per_split else self.lat_alpha_mean[:len([b for b in self.freq_bases if b.startswith('LAT')])]
lat_prior = -0.5 * np.sum((np.array(alphas)[lat_idx] - lat_alpha_means)** 2 / self.lat_alpha_err**2 -
np.log(self.lat_alpha_err * np.sqrt(2 * np.pi)))
return float(sat_prior + lat_prior)
# ============================================================
# =============== SUBCLASS 3: LAT-ONLY BETA FIT ==============
# ============================================================
[docs]
class LatCross(BaseSat4LatCross):
"""
Calibration model fitting for birefringence angle β using LAT-only data.
This class implements a calibration analysis that fits calibration angles (alphas)
and the cosmic birefringence angle β using EB cross-spectra from LAT only.
Attributes:
cl_len: Lensed CMB power spectra dictionary
lat_err: LAT calibration error
"""
[docs]
def __init__(self,
spec_cross: 'SpectraCross',
lat_err: float,
beta_fid: float,
lat_lrange: Tuple[Optional[int], Optional[int]] = (None, None),
fit_per_split: bool = True,
spectra_selection: str = 'all',
num_sims: int = 100,
verbose: bool = False,
cov_mode: str = 'diagonal') -> None:
"""
Initialize LAT-only calibration analysis for birefringence angle fitting.
Args:
spec_cross: Cross-spectrum calculation object (must be in LAT-only mode)
lat_err: LAT calibration error
beta_fid: Fiducial birefringence angle for theory calculation
lat_lrange: LAT ell range for fitting (lmin, lmax)
fit_per_split: Whether to fit per split or per frequency
spectra_selection: Which spectra to include ('all', 'auto_only', 'cross_only')
num_sims: Number of simulations for mean/std calculation
verbose: Whether to enable verbose output
cov_mode: Covariance mode ('diagonal', 'block_diag', 'shrinkage', 'full')
"""
# Verify spec_cross is in LAT-only mode
assert hasattr(spec_cross, 'lat_only') and spec_cross.lat_only, \
"LatCross requires spec_cross to be initialized with sat=None (LAT-only mode)"
# For LAT-only, we use the same sat_err parameter as lat_err and ignore SAT ranges
super().__init__(spec_cross, lat_err,
sat_lrange=lat_lrange, # Use LAT range for both
lat_lrange=lat_lrange,
fit_per_split=fit_per_split,
spectra_selection=spectra_selection,
libdir_suffix='LatCalibrationAnalysis',
num_sims=num_sims,
verbose=verbose,
cov_mode=cov_mode)
self.lat_err = lat_err
self.cl_len = CMB(spec_cross.libdir, spec_cross.nside, beta=beta_fid).get_lensed_spectra(dl=False)
# Verify all maps are LAT maps (redundant check after lat_only assertion, but kept for clarity)
if not all(tag.startswith('LAT') for tag in self.maptags):
raise ValueError("LatCross requires all maps to be LAT maps. Found non-LAT maps in spec_cross.")
if fit_per_split:
self.__pnames__ = [f"a_{t}" for t in self.maptags] + ['beta']
self.__plabels__ = [_format_alpha_label(t) for t in self.maptags] + [r"\beta"]
else:
self.__pnames__ = [f"a_{f}" for f in self.freq_bases] + ['beta']
self.__plabels__ = [_format_alpha_label(f) for f in self.freq_bases] + [r"\beta"]
[docs]
def theory(self, theta: np.ndarray) -> np.ndarray:
"""
Compute theoretical EB spectra between all LAT map pairs
using the exact birefringence + miscalibration formula:
C_ell^{E_iB_j} = cos(2α_i+2β)sin(2α_j+2β)C_EE
- sin(2α_i+2β)cos(2α_j+2β)C_BB
"""
if self.fit_per_split:
alphas = theta[:-1]
else:
base_a = {b: theta[i] for i, b in enumerate(self.freq_bases)}
alphas = np.array([base_a[t.rsplit('-', 1)[0]] for t in self.maptags])
beta = theta[-1]
cl_ee = self.cl_len["ee"][:self.Lmax+1]
cl_bb = self.cl_len["bb"][:self.Lmax+1]
model = np.zeros_like(self.mean_spec)
for i in range(len(self.maptags)): # E_i
for j in range(len(self.maptags)): # B_j
ai = np.deg2rad(alphas[i])
aj = np.deg2rad(alphas[j])
b = np.deg2rad(beta)
term = (
np.cos(2*ai + 2*b) * np.sin(2*aj + 2*b) * cl_ee
- np.sin(2*ai + 2*b) * np.cos(2*aj + 2*b) * cl_bb
)
model[i, j] = self.binner.bin_cell(term)
return model
[docs]
def lnprior(self, theta: np.ndarray) -> float:
"""
Calculate log prior for LAT-only analysis.
Uses Gaussian prior on all alphas with LAT calibration error.
"""
alphas, beta = theta[:-1], theta[-1]
if np.any(np.abs(alphas) > 0.5) or not (-0.5 < beta < 0.5):
return -np.inf
# Gaussian prior on all LAT alphas
return float(-0.5 * np.sum(alphas ** 2 / self.lat_err**2 -
np.log(self.lat_err * np.sqrt(2 * np.pi))))
@property
def dof(self) -> int:
"""
Calculate degrees of freedom for chi-squared analysis.
Returns:
Number of data points minus number of fitted parameters
"""
n_data_points = np.sum(self.__likelihood_mask__)
n_parameters = len(self.__pnames__)
return n_data_points - n_parameters