Source code for cobi.quest

"""
Quadratic Estimator for Cosmic Birefringence Module
====================================================

This module implements quadratic estimator (QE) methods for reconstructing
cosmic birefringence rotation angle fields from CMB polarization observations.

The module provides:
- Wiener filtering of E and B modes using C⁻¹ filters
- Quadratic estimator reconstruction of rotation angle alms
- N0 and RDN0 bias estimation
- Mean field subtraction
- Likelihood analysis for cosmic birefringence amplitude

Classes
-------
FilterEB
    Implements C⁻¹ Wiener filtering for E and B mode polarization using
    the curvedsky library. Handles component-separated maps with noise
    and beam deconvolution.

QE
    Quadratic estimator class for cosmic birefringence reconstruction.
    Computes rotation angle power spectra with various bias correction
    methods (analytical N0, realization-dependent N0, mean field).
    
    Simulation Index Structure (sim_config is required):
    
    From sim_config={'set1': 400, 'reuse_last': 100} and nsims_mf=100:
    - stat_index: 0-299 (set1 - nsims_mf) - Statistics and OCL computation
    - mf_index: 300-399 (last nsims_mf from set1) - Mean field simulations
    - vary_index: 300-399 (last reuse_last from set1) - Varying alpha (CMB mode='vary')
    - const_index: 400-499 (reuse_last sims) - Constant alpha (CMB mode='constant')
    - null_index: 500-599 (reuse_last sims) - Null alpha (CMB mode='null')
    
    Note: When nsims_mf equals reuse_last, mf_index and vary_index are identical.
    
    Bias Estimation:
    - MCN0('stat'): Uses stat_index (0-299)
    - MCN0('vary'): Uses vary_index (300-399)
    - MCN0('const'): Uses const_index (400-499)
    - N1 = MCN0('const') - MCN0('vary')
    - Nlens: Uses null_index (500-599) and MCN0('vary')

AcbLikelihood
    Likelihood analysis class for constraining the cosmic birefringence
    amplitude A_CB from reconstructed rotation angle power spectra.

Algorithm
---------
The quadratic estimator exploits EB correlations induced by cosmic birefringence:

1. Apply C⁻¹ Wiener filter to observed E and B maps
2. Compute quadratic combination: qlm ∝ Ẽlm B̃*lm
3. Apply normalization from Fisher information
4. Estimate and subtract biases (N0, mean field)
5. Compare to theoretical prediction for A_CB

Requirements
------------
Requires the curvedsky library for efficient curved-sky operations.
Install from: https://github.com/toshiyan/cmblensplus

Example
-------
    from cobi.quest import FilterEB, QE
    from cobi.simulation import LATsky, Mask
    
    # Set up sky and mask
    sky = LATsky(libdir, nside=512)
    mask = Mask(libdir, nside=512, mask_str='LAT')
    
    # Initialize Wiener filter
    filter_eb = FilterEB(
        sky=sky,
        mask=mask,
        lmax=3000,
        fwhm=2.0,  # arcmin
        sht_backend='ducc0'
    )
    
    # Create quadratic estimator
    qe = QE(
        filter=filter_eb,
        lmin=30,
        lmax=300,
        recon_lmax=100,
        norm_nsim=100
    )
    
    # Reconstruct rotation angle for simulation 0
    qlm = qe.qlm(idx=0)
    
    # Get bias-corrected power spectrum
    cl_aa = qe.qcl(idx=0, rm_bias=True, rdn0=True)
    
    # Run likelihood analysis
    likelihood = AcbLikelihood(qe, lmin=2, lmax=50)
    samples = likelihood.samples(nwalkers=100, nsamples=2000)

Notes
-----
The curvedsky library must be installed to use this module.
MPI parallelization is supported for RDN0 computation.
"""

import os
try:
    import curvedsky as cs
except ImportError:
    cs = None
    print("Install curvedsky to use the QE module.")
import numpy as np
import healpy as hp
import pickle as pl
import matplotlib.pyplot as plt
import pymaster as nmt
from tqdm.auto import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed


from typing import Dict, Optional, Any, Union, List

from cobi.simulation import LATsky, Mask
from cobi.utils import cli, slice_alms
from cobi import sht
from cobi import mpi
from time import time


Tcmb  = 2.726e6
NCPUS = os.cpu_count()

[docs] class FilterEB:
[docs] def __init__(self, sky: LATsky, mask: Mask, lmax: int, fwhm: float = 2, sht_backend: str = "healpy"): self.sky = sky self.nside = sky.nside self.lmax = min(lmax, 3*self.nside -1) self.mask = mask.mask self.fsky = mask.fsky self.ninv = np.reshape(np.array((self.mask,self.mask)),(2,1,hp.nside2npix(self.nside))) self.bl = hp.gauss_beam(fwhm=np.radians(fwhm/60), lmax=self.lmax) if sht_backend in ['ducc0', 'ducc', 'd']: self.hp = sht.HealpixDUCC(nside=self.nside) self.healpy = False else: self.hp = None self.healpy = True self.cl_len = sky.cmb.get_lensed_spectra(dl=False,dtype='a').T/ Tcmb**2 self.lib_dir = os.path.join(sky.libdir, 'cinv') if mpi.rank == 0: os.makedirs(self.lib_dir, exist_ok=True)
@property def Bl(self): return np.reshape(self.bl,(1,self.lmax+1))
[docs] def convolved_EB(self,idx,split=0): """ convolve the component separated map with the beam Parameters ---------- idx : int : index of the simulation """ E,B = self.sky.HILC_obsEB(idx,ret='alm',split=split) hp.almxfl(E,self.bl,inplace=True) hp.almxfl(B,self.bl,inplace=True) return E,B
[docs] def NL(self,idx,split=0): """ array manipulation of noise spectra obtained by ILC weight for the filtering process """ ne,nb = self.sky.HILC_obsEB(idx, ret='nl',split=split)/Tcmb**2 return np.reshape(np.array((cli(ne[:self.lmax+1]*self.bl**2), cli(nb[:self.lmax+1]*self.bl**2))),(2,1,self.lmax+1))
[docs] def QU(self, idx, split=0): """ deconvolve the beam from the QU map Parameters ---------- idx : int : index of the simulation """ E, B = self.convolved_EB(idx, split=split) E, B = slice_alms(np.array([E, B]), self.lmax) if self.healpy: QU = hp.alm2map([E*0,E,B], self.nside)[1:]/Tcmb else: QU = self.hp.alm2map([E, B], lmax=self.lmax,nthreads=NCPUS)/Tcmb QU = QU*self.mask QU[QU == -0] = 0 return QU
def __cinv_eb__(self, idx, fname, test=False, split=0): QU = self.QU(idx, split=split) QU = np.reshape(QU,(2,1,hp.nside2npix(self.nside))) iterations = [200] stat_file = '' if test: iterations = [10] stat_file = os.path.join('test_stat.txt') E,B = cs.cninv.cnfilter_freq(2,1,self.nside,self.lmax,self.cl_len[1:3,:self.lmax+1], self.Bl, self.ninv,QU,chn=1,itns=iterations,filter="", eps=[1e-5],ro=10,inl=self.NL(idx,split=split),stat=stat_file) if not test: pl.dump((E,B), open(fname,'wb')) return E, B
[docs] def cinv_EB(self,idx,split=0,test=False): """ C inv Filter for the component separated maps Parameters ---------- idx : int : index of the simulation test : bool : if True, run the filter for 10 iterations (not cached) """ fsky = f"{self.fsky:.2f}".replace('.','p') if split == 0: fname = os.path.join(self.lib_dir,f"cinv_EB_{idx:04d}_fsky_{fsky}.pkl") else: fname = os.path.join(self.lib_dir,f"cinv_EB_{idx:04d}_fsky_{fsky}_split{split}.pkl") if os.path.isfile(fname): try: E,B = pl.load(open(fname,'rb')) except: E,B = self.__cinv_eb__(idx,fname,test=test,split=split) else: E,B = self.__cinv_eb__(idx,fname,test=test,split=split) return E,B
[docs] def check_file_exist(self,nsims=600,split=0): missing = [] file_err = [] for idx in tqdm(range(nsims),desc='Checking cinv files'): fsky = f"{self.fsky:.2f}".replace('.','p') if split == 0: fname = os.path.join(self.lib_dir,f"cinv_EB_{idx:04d}_fsky_{fsky}.pkl") else: fname = os.path.join(self.lib_dir,f"cinv_EB_{idx:04d}_fsky_{fsky}_split{split}.pkl") if not os.path.isfile(fname): missing.append(idx) else: try: E,B = pl.load(open(fname,'rb')) except: file_err.append(idx) print(f"Total missing files: {len(missing)}") print(f"Total file errors: {len(file_err)}") return missing, file_err
[docs] def plot_cinv(self,idx,split=0,lmin=2,lmax=3000): """ plot the cinv filtered Cls for a given idx Parameters ---------- idx : int : index of the simulation """ E,_ = self.cinv_EB(idx, split=split) ne,_ = self.sky.HILC_obsEB(idx,split=split, ret='nl')/Tcmb**2 cle = cs.utils.alm2cl(self.lmax,E)/self.fsky plt.figure(figsize=(4,4)) plt.loglog(cle,label='Cinv E mode') plt.loglog(1/(self.cl_len[1,:len(ne)] + ne), label='1/(S+N)') plt.xlim(lmin,lmax) plt.legend()
[docs] class QE:
[docs] def __init__(self, filter: FilterEB, lmin: int, lmax: int, recon_lmax: int, nsims_mf=100, nlb=10, lmax_bin=1024): self.basedir = os.path.join(filter.sky.libdir, 'qe') self.recdir = os.path.join(self.basedir, f'reco_min{lmin}_max{lmax}_rmax{recon_lmax}') self.rdn0dir = os.path.join(self.basedir, f'rdn0_min{lmin}_max{lmax}_rmax{recon_lmax}') if mpi.rank == 0: os.makedirs(self.basedir, exist_ok=True) os.makedirs(self.recdir, exist_ok=True) os.makedirs(self.rdn0dir, exist_ok=True) self.filter = filter self.sim_config = filter.sky.cmb.sim_config self.lmin = lmin self.lmax = lmax self.recon_lmax = recon_lmax self.cl_len = filter.cl_len self.nsims_mf = nsims_mf # sim_config is required if self.sim_config is None: raise ValueError("sim_config must be set in CMB initialization. QE requires sim_config to define simulation ranges.") set1 = self.sim_config['set1'] reuse_last = self.sim_config['reuse_last'] # Statistics simulations: first (set1 - nsims_mf) self.stat_index = np.arange(0, set1 - nsims_mf) # Mean field simulations: last nsims_mf from set1 self.mf_index = np.arange(set1 - nsims_mf, set1) # Varying alpha range: last reuse_last from set1 self.vary_index = np.arange(set1 - reuse_last, set1) # Constant alpha range: set1 to set1+reuse_last self.const_index = np.arange(set1, set1 + reuse_last) # Null alpha range: set1+reuse_last to set1+2*reuse_last self.null_index = np.arange(set1 + reuse_last, set1 + 2 * reuse_last) self.norm = self.__norm__ self.lmax_bin = lmax_bin self.binner = nmt.NmtBin.from_lmax_linear(lmax_bin,nlb) self.b = self.binner.get_effective_ells() self.nlb = nlb
@property def __norm__(self): fname = os.path.join(self.basedir, f'norm_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}.pkl') if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: ocl = self.__ocl__ norm = cs.norm_quad.qeb('rot',self.recon_lmax,self.lmin,self.lmax,self.cl_len[1,:self.lmax+1],ocl[1,:self.lmax+1],ocl[2,:self.lmax+1])[0] pl.dump(norm, open(fname,'wb')) return norm @property def __ocl__(self): fname = os.path.join(self.basedir, f'ocl_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}.pkl') if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: ocl_len = self.cl_len.copy() ne,nb = [],[] # Use stat_index for computing OCL for i in tqdm(self.stat_index, desc='Computing OCL'): e,b = self.filter.sky.HILC_obsEB(i, ret='nl') ne.append(e[:self.lmax+1]/Tcmb**2) nb.append(b[:self.lmax+1]/Tcmb**2) ne, nb = np.array(ne).mean(axis=0), np.array(nb).mean(axis=0) ocl_len[1,:self.lmax+1] += ne ocl_len[2,:self.lmax+1] += nb pl.dump(ocl_len, open(fname,'wb')) return ocl_len @property def cl_aa(self): return self.filter.sky.cmb.cl_aa()[:self.recon_lmax+1]
[docs] def qlm(self, idx): fname = os.path.join(self.recdir, f'qlm_fsky{self.filter.fsky:.2f}_{idx:04d}.pkl') if os.path.isfile(fname): return pl.load(open(fname, 'rb')) else: E,B = self.filter.cinv_EB(idx) alm = cs.rec_rot.qeb(self.recon_lmax,self.lmin,self.lmax,self.cl_len[1,:self.lmax+1],E[:self.lmax+1,:self.lmax+1],B[:self.lmax+1,:self.lmax+1]) nalm = alm * self.norm[:,None] pl.dump(nalm, open(fname, 'wb')) return nalm
[docs] def check_file_exist(self): missing = [] for idx in range(300): fname = os.path.join(self.recdir, f'qlm_fsky{self.filter.fsky:.2f}_{idx:04d}.pkl') if not os.path.isfile(fname): missing.append(idx) return missing
def __qcl__(self,idx,n0=None, mf=False, nlens=False, n1=False): qcl = cs.utils.alm2cl(self.recon_lmax,self.qlm(idx))/self.filter.fsky if n0 is None: N0 = np.zeros_like(qcl) elif n0 == 'norm': N0 = self.norm elif n0 == 'mcn0': N0 = self.MCN0('stat') elif n0 == 'rdn0': N0 = self.RDN0(idx) else: raise ValueError("n0 must be 'norm', 'rdn0', or 'mcn0'") qcl = qcl - N0 if mf: qcl = qcl - self.mean_field_cl() if nlens: qcl = qcl - self.Nlens() if n1: qcl = qcl - self.N1() return qcl
[docs] def qcl(self,idx, n0=None, mf=False, nlens=False, n1=False, binned=False): cl = self.__qcl__(idx,n0=n0,mf=mf,nlens=nlens,n1=n1) if binned: bcl = self.binner.bin_cell(cl[:self.lmax_bin+1]) return bcl else: return cl
[docs] def mean_field(self): fname = os.path.join(self.basedir, f'mf{self.nsims_mf}_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): return pl.load(open(fname, 'rb')) else: mf = np.zeros_like(self.qlm(self.mf_index[0])) for i in tqdm(self.mf_index, desc='Computing mean field'): mf += self.qlm(i) mf /= self.nsims_mf pl.dump(mf, open(fname, 'wb')) return mf
[docs] def mean_field_cl(self): return cs.utils.alm2cl(self.recon_lmax, self.mean_field()) / self.filter.fsky
[docs] def RDN0(self,idx): fname = os.path.join(self.rdn0dir,f"RDN0_{self.filter.fsky:.2f}_{idx:04d}.pkl") if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: # Use stat_index for cycling myidx = self.stat_index.copy() E0,B0 = self.filter.cinv_EB(idx) mean_rdn0 = [] for i in tqdm(range(100),desc=f'RDN0 for simulation {idx}', unit='sim'): # Cycle through n0_index i1 = myidx[i % len(myidx)] i2 = myidx[(i + 1) % len(myidx)] E1,B1 = self.filter.cinv_EB(i1) E2,B2 = self.filter.cinv_EB(i2) # E_0,B_1 glm1 = cs.rec_rot.qeb(self.recon_lmax,self.lmin,self.lmax,self.filter.cl_len[1,:self.lmax+1], E0[:self.lmax+1,:self.lmax+1], B1[:self.lmax+1,:self.lmax+1]) # E_1,B_0 glm2 = cs.rec_rot.qeb(self.recon_lmax,self.lmin,self.lmax,self.filter.cl_len[1,:self.lmax+1], E1[:self.lmax+1,:self.lmax+1], B0[:self.lmax+1,:self.lmax+1]) # E_1,B_2 glm3 = cs.rec_rot.qeb(self.recon_lmax,self.lmin,self.lmax,self.filter.cl_len[1,:self.lmax+1], E1[:self.lmax+1,:self.lmax+1], B2[:self.lmax+1,:self.lmax+1]) # E_2,B_1 glm4 = cs.rec_rot.qeb(self.recon_lmax,self.lmin,self.lmax,self.filter.cl_len[1,:self.lmax+1], E2[:self.lmax+1,:self.lmax+1], B1[:self.lmax+1,:self.lmax+1]) del (E1,B1,E2,B2) glm1 *= self.norm[:,None] glm2 *= self.norm[:,None] glm3 *= self.norm[:,None] glm4 *= self.norm[:,None] first_four = cs.utils.alm2cl(self.recon_lmax, glm1 + glm2)/(self.filter.fsky) #type: ignore del (glm1,glm2) second_last = cs.utils.alm2cl(self.recon_lmax, glm3)/(self.filter.fsky) #type: ignore last = cs.utils.alm2cl(self.recon_lmax, glm3,glm4)/(self.filter.fsky) #type: ignore del (glm3,glm4) mean_rdn0.append(first_four - second_last - last) del (first_four,second_last,last) del (E0,B0) rdn0 = np.mean(mean_rdn0,axis=0) pl.dump(rdn0,open(fname,'wb')) return rdn0
[docs] def RDN0_mpi(self, idx): """ MPI-parallel version of RDN0(...) using mpi4py. Parallelizes the 100 Monte-Carlo iterations; safe for any number of ranks. Only rank 0 touches the on-disk cache. """ MPI = mpi.mpi comm = mpi.com rank = mpi.rank size = mpi.size fname = os.path.join(self.rdn0dir, f"RDN0_{self.filter.fsky:.2f}_{idx:04d}.pkl") # 1) Try to load cached result on rank 0, then broadcast. rdn0 = None if rank == 0 and os.path.isfile(fname): with open(fname, 'rb') as f: rdn0 = pl.load(f) rdn0 = comm.bcast(rdn0, root=0) if rdn0 is not None: return rdn0 # 2) Build the index cycling array on all ranks (cheap & deterministic). myidx = self.stat_index.copy() # 3) Compute (or broadcast) the fixed E0, B0 for this idx. if rank == 0: E0, B0 = self.filter.cinv_EB(idx) else: E0 = B0 = None E0 = comm.bcast(E0, root=0) B0 = comm.bcast(B0, root=0) # 4) Distribute the 100 iterations across ranks. tasks = np.arange(100, dtype=int) # chunking keeps i and i+1 local in the same process most of the time chunks = np.array_split(tasks, size) my_tasks = chunks[rank] # We know alm2cl returns an array of length (recon_lmax+1) L = self.recon_lmax + 1 local_sum = np.zeros(L, dtype=np.float64) # Helper to compute one contribution (vector length L) def _one_contrib(i): # Cycle through n0_index i1 = int(myidx[i % len(myidx)]) i2 = int(myidx[(i + 1) % len(myidx)]) E1, B1 = self.filter.cinv_EB(i1) E2, B2 = self.filter.cinv_EB(i2) # Quadratic estimators glm1 = cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.filter.cl_len[1, :self.lmax+1], E0[:self.lmax+1, :self.lmax+1], B1[:self.lmax+1, :self.lmax+1], ) glm2 = cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.filter.cl_len[1, :self.lmax+1], E1[:self.lmax+1, :self.lmax+1], B0[:self.lmax+1, :self.lmax+1], ) glm3 = cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.filter.cl_len[1, :self.lmax+1], E1[:self.lmax+1, :self.lmax+1], B2[:self.lmax+1, :self.lmax+1], ) glm4 = cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.filter.cl_len[1, :self.lmax+1], E2[:self.lmax+1, :self.lmax+1], B1[:self.lmax+1, :self.lmax+1], ) # Normalization per-ℓ glm1 *= self.norm[:, None] glm2 *= self.norm[:, None] glm3 *= self.norm[:, None] glm4 *= self.norm[:, None] first_four = cs.utils.alm2cl(self.recon_lmax, glm1 + glm2) / (self.filter.fsky) # type: ignore second_last = cs.utils.alm2cl(self.recon_lmax, glm3) / (self.filter.fsky) # type: ignore last = cs.utils.alm2cl(self.recon_lmax, glm3, glm4) / (self.filter.fsky) # type: ignore return first_four - second_last - last # 5) Local accumulation with an optional progress bar on rank 0. iterator = tqdm(my_tasks, desc=f'RDN0 (rank {rank}) for simulation {idx}', unit='sim') if rank == 0 else my_tasks for i in iterator: local_sum += _one_contrib(i) # 6) Global reduction (sum over ranks), then average over the 100 draws. global_sum = np.zeros_like(local_sum) comm.Reduce(local_sum, global_sum, op=MPI.SUM, root=0) if rank == 0: rdn0 = global_sum / float(len(tasks)) # divide by 100 # Cache to disk os.makedirs(self.rdn0dir, exist_ok=True) with open(fname, 'wb') as f: pl.dump(rdn0, f) # 7) Broadcast the final result so all ranks return the same array. rdn0 = comm.bcast(rdn0, root=0) return rdn0
[docs] def N0_sim(self,idx,which='vary',debug=False): """ Calculate the N0 bias from the Reconstructed potential using filtered Fields with different CMB fields. If E modes is from ith simulation then B modes is from (i+1)th simulation idx: int : index of the N0 which: str : 'stat', 'vary', or 'const' to select which index range to use debug: bool : if True, print the indices used for computation and return None Index ranges and wrapping: - 'stat': stat_index, wraps within stat range - 'vary': vary_index, wraps within vary range - 'const': const_index, wraps within const range Requires sim_config to be set, or manually set the corresponding index array. """ if which == 'stat': index_range = self.stat_index label = 'stat' elif which == 'vary': index_range = self.vary_index label = 'vary' elif which == 'const': index_range = self.const_index label = 'const' else: raise ValueError("which must be 'stat', 'vary', or 'const'") assert idx in index_range, f"The requested idx {idx} is not in the {which} index range" # Simple increment with wrapping within the index_range bounds idx1 = idx min_idx = min(index_range) max_idx = max(index_range) # If at the end of range, wrap to beginning of range if idx == max_idx: idx2 = min_idx else: idx2 = idx + 1 if debug: print(f"N0_sim debug mode:") print(f" which: {which}") print(f" index_range: [{min_idx}, {max_idx}]") print(f" idx1 (E1B1): {idx1}") print(f" idx2 (E2B2): {idx2}") return None fname = os.path.join(self.rdn0dir,f"N0_{label}_{self.filter.fsky:.2f}_{idx:04d}.pkl") if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: E1,B1 = self.filter.cinv_EB(idx1) E2,B2 = self.filter.cinv_EB(idx2) glm1 = cs.rec_rot.qeb(self.recon_lmax,self.lmin,self.lmax, self.cl_len[1,:self.lmax+1], E1[:self.lmax+1,:self.lmax+1], B2[:self.lmax+1,:self.lmax+1]) glm2 = cs.rec_rot.qeb(self.recon_lmax,self.lmin,self.lmax, self.cl_len[1,:self.lmax+1], E2[:self.lmax+1,:self.lmax+1], B1[:self.lmax+1,:self.lmax+1]) glm1 *= self.norm[:,None] glm2 *= self.norm[:,None] glm = glm1 + glm2 n0cl = cs.utils.alm2cl(self.recon_lmax,glm)/(2*self.filter.fsky) # type: ignore pl.dump(n0cl,open(fname,'wb')) return n0cl
[docs] def MCN0(self, which='vary'): """ Monte Carlo average of N0_sim over specified index range which: str : 'stat', 'vary', or 'const' to select which index range to use 'stat' uses stat_index 'vary' uses vary_index 'const' uses const_index Requires sim_config to be set, or manually set the corresponding index arrays. Note: vary_index overlaps with mf_index only when nsims_mf equals reuse_last. """ fname = os.path.join(self.basedir, f'MCN0_{which}_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: if which == 'stat': index = self.stat_index elif which == 'vary': index = self.vary_index elif which == 'const': index = self.const_index else: raise ValueError("which must be 'stat', 'vary', or 'const'") n0_list = [] for idx in tqdm(index, desc=f'Computing MCN0 ({which})'): n0_list.append(self.N0_sim(idx, which=which)) mcn0 = np.array(n0_list).mean(axis=0) pl.dump(mcn0, open(fname,'wb')) return mcn0
[docs] def N1(self,binned=False): """ N1 bias: difference between MCN0 for constant and varying alpha N1 = MCN0('const') - MCN0('vary') """ fname = os.path.join(self.basedir, f'N1_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): n1 = pl.load(open(fname,'rb')) else: n1 = self.MCN0('const') - self.MCN0('vary') pl.dump(n1, open(fname,'wb')) if binned: bn1 = self.binner.bin_cell(n1[:self.lmax_bin+1]) return bn1 else: return n1
[docs] def Nlens(self,MCN0=True,binned=False): """ Lensing bias: average qcl on null_index minus MCN0('vary') Nlens = <qcl(null_alpha)> - MCN0('vary') """ fname = os.path.join(self.basedir, f'Nlens_fsky{self.filter.fsky:.2f}_mcn0{MCN0}.pkl') if os.path.isfile(fname): nlens = pl.load(open(fname,'rb')) else: qcl_list = [] for idx in tqdm(self.null_index, desc='Computing Nlens'): # Get qcl without any bias subtraction qcl_list.append(cs.utils.alm2cl(self.recon_lmax, self.qlm(idx))/self.filter.fsky) avg_qcl = np.array(qcl_list).mean(axis=0) if MCN0: nlens = avg_qcl - self.MCN0('vary') else: nlens = avg_qcl - self.norm pl.dump(nlens, open(fname,'wb')) if binned: bnlen = self.binner.bin_cell(nlens[:self.lmax_bin+1]) return bnlen else: return nlens
[docs] def RDN0_stat(self): """ RDN0 for all stat_index simulations """ fname = os.path.join(self.basedir, f'RDN0_stat_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: rdn0_list = [] for i in tqdm(self.stat_index, desc='Computing RDN0 statistics'): rdn0_list.append(self.RDN0(i)) rdn0_array = np.array(rdn0_list).mean(axis=0) pl.dump(rdn0_array, open(fname,'wb')) return rdn0_array
[docs] def qcl_stat(self, n0=None, mf=False, nlens=False, n1=False,binned=True): st = '' if n0 is None: st += '_noN0' elif n0 == 'norm': st += '_n0anal' elif n0 == 'rdn0': st += '_n0rdn0' elif n0 == 'mcn0': st += '_n0mcn0' else: raise ValueError("n0 must be 'norm', 'rdn0', or 'mcn0'") if mf: st += '_mf' if nlens: st += '_nlens' if n1: st += '_n1' fname = os.path.join(self.basedir, f'qcl_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}_nlb{self.nlb}_{st}_{binned}.pkl') if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: cl = [] for i in tqdm(self.stat_index,desc='Computing cl statistics',unit='sim'): cl.append(self.qcl(i,n0=n0,mf=mf,nlens=nlens,n1=n1,binned=binned)) cl = np.array(cl) pl.dump(cl,open(fname,'wb')) return cl
[docs] class CrossQE:
[docs] def __init__(self, filter: FilterEB, lmin: int, lmax: int, recon_lmax: int,nsims_mf=100, nlb=10, lmax_bin=1024): assert filter.sky.nsplits == 4, "CrossQE requires 4 splits in the FilterEB sky object." self.basedir = os.path.join(filter.sky.libdir, 'qecross') self.recdir = os.path.join(self.basedir, f'reco_min{lmin}_max{lmax}_rmax{recon_lmax}') self.n0dir = os.path.join(self.basedir, f'rdn0_min{lmin}_max{lmax}_rmax{recon_lmax}') self.mdir = os.path.join(self.basedir, f'misc_min{lmin}_max{lmax}_rmax{recon_lmax}') if mpi.rank == 0: os.makedirs(self.basedir, exist_ok=True) os.makedirs(self.recdir, exist_ok=True) os.makedirs(self.n0dir, exist_ok=True) os.makedirs(self.mdir, exist_ok=True) self.filter = filter self.sim_config = filter.sky.cmb.sim_config self.lmin = lmin self.lmax = lmax self.recon_lmax = recon_lmax self.cl_len = filter.cl_len self.lmax_bin = lmax_bin self.sim_config = filter.sky.cmb.sim_config if self.sim_config is None: raise ValueError("sim_config must be set in CMB initialization. QE requires sim_config to define simulation ranges.") # Default simulation ranges set1 = self.sim_config['set1'] reuse_last = self.sim_config['reuse_last'] # Statistics simulations: first (set1 - nsims_mf) self.stat_index = np.arange(0, set1 - nsims_mf) # Mean field simulations: last nsims_mf from set1 self.mf_index = np.arange(set1 - nsims_mf, set1) # Varying alpha range: last reuse_last from set1 self.vary_index = np.arange(set1 - reuse_last, set1) # Constant alpha range: set1 to set1+reuse_last self.const_index = np.arange(set1, set1 + reuse_last) # Null alpha range: set1+reuse_last to set1+2*reuse_last self.null_index = np.arange(set1 + reuse_last, set1 + 2 * reuse_last) self.binner = nmt.NmtBin.from_lmax_linear(lmax_bin,nlb) self.b = self.binner.get_effective_ells() self.nlb = nlb
@property def cl_aa(self): return self.filter.sky.cmb.cl_aa()[:self.recon_lmax+1]
[docs] def precompute_split_ocl(self, splits=(1,2,3,4)): """ Computes split-level total spectra: oclE[s] = ClEE_lensed + <N_EE_split_s> oclB[s] = ClBB_lensed + <N_BB_split_s> Saves dict {s: (oclE, oclB)}. """ fsky_tag = f"{self.filter.fsky:.2f}".replace('.', 'p') fname = os.path.join( self.basedir, f"ocl_splits_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}_fsky_{fsky_tag}.pkl" ) if os.path.isfile(fname): return pl.load(open(fname, "rb")) # Start from theory (lensed) spectra clE_th = self.cl_len[1, :self.lmax+1].copy() clB_th = self.cl_len[2, :self.lmax+1].copy() ocl = {} for s in splits: ne_acc = np.zeros(self.lmax+1, dtype=np.float64) nb_acc = np.zeros(self.lmax+1, dtype=np.float64) ncount = 0 for i in tqdm(self.stat_index, desc=f"Split OCL s={s}"): ne, nb = self.filter.sky.HILC_obsEB(i, ret="nl", split=s) ne_acc += ne[:self.lmax+1] / Tcmb**2 nb_acc += nb[:self.lmax+1] / Tcmb**2 ncount += 1 ne_mean = ne_acc / ncount nb_mean = nb_acc / ncount oclE = clE_th + ne_mean oclB = clB_th + nb_mean ocl[s] = (oclE, oclB) pl.dump(ocl, open(fname, "wb")) return ocl
[docs] def precompute_pair_norms(self, pairs=((1,2),(3,4),(1,3),(2,4),(1,4),(2,3))): """ Precomputes normalization arrays for each split pair. Returns dict: norms[(i,j)] = {"EiBj": norm_ij, "EjBi": norm_ji} where norm_ij normalizes Q(E^i, B^j). """ fsky_tag = f"{self.filter.fsky:.2f}".replace('.', 'p') fname = os.path.join( self.basedir, f"norm_pairs_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}_fsky_{fsky_tag}.pkl" ) if os.path.isfile(fname): return pl.load(open(fname, "rb")) ocl = self.precompute_split_ocl(splits=(1,2,3,4)) clE_th = self.cl_len[1, :self.lmax+1] norms = {} for (i, j) in pairs: oclE_i, oclB_i = ocl[i] oclE_j, oclB_j = ocl[j] # norm for Q(E^i, B^j) norm_ij = cs.norm_quad.qeb( 'rot', self.recon_lmax, self.lmin, self.lmax, clE_th, oclE_i, # total EE for E leg from split i oclB_j # total BB for B leg from split j )[0] # norm for Q(E^j, B^i) norm_ji = cs.norm_quad.qeb( 'rot', self.recon_lmax, self.lmin, self.lmax, clE_th, oclE_j, oclB_i )[0] norms[(i, j)] = {"EiBj": norm_ij, "EjBi": norm_ji} pl.dump(norms, open(fname, "wb")) return norms
[docs] def qlm_pair(self, idx, si: int, sj: int): assert si != sj i, j = (si, sj) if si < sj else (sj, si) norms = self.precompute_pair_norms( pairs=((1,2),(3,4),(1,3),(2,4),(1,4),(2,3)) ) n_ij = norms[(i, j)]["EiBj"] if (si, sj) == (i, j) else norms[(i, j)]["EjBi"] n_ji = norms[(i, j)]["EjBi"] if (si, sj) == (i, j) else norms[(i, j)]["EiBj"] Ei, Bi = self.filter.cinv_EB(idx, split=si) Ej, Bj = self.filter.cinv_EB(idx, split=sj) alm_EiBj = cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.cl_len[1, :self.lmax+1], Ei[:self.lmax+1, :self.lmax+1], Bj[:self.lmax+1, :self.lmax+1] ) alm_EjBi = cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.cl_len[1, :self.lmax+1], Ej[:self.lmax+1, :self.lmax+1], Bi[:self.lmax+1, :self.lmax+1] ) # Apply pair-direction-specific norms, then symmetrize phi = 0.5 * (alm_EiBj * n_ij[:, None] + alm_EjBi * n_ji[:, None]) return phi
[docs] def mean_field_sim(self, idx, splits=(1,2,3,4)): """ Cross-only lensing power estimate: average over cross-spectra of reconstructions from disjoint split pairs. For 4 splits, uses the 3 disjoint pairings. """ fname = os.path.join( self.mdir, f"mean_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}_fsky{self.filter.fsky:.2f}_{idx:04d}.pkl" ) if os.path.isfile(fname): return pl.load(open(fname, "rb")) else: s1, s2, s3, s4 = splits # three disjoint pairings pairings = [ ((s1, s2), (s3, s4)), ((s1, s3), (s2, s4)), ((s1, s4), (s2, s3)), ] m = None for (i, j), (k, l) in pairings: phi_ij = self.qlm_pair(idx, i, j) phi_kl = self.qlm_pair(idx, k, l) phi = 0.5 * (phi_ij + phi_kl) if m is None: m = phi else: m += phi m /= 3.0 pl.dump(m, open(fname, "wb")) return m
[docs] def mean_field(self): m = [] for idx in tqdm(self.mf_index, desc='Computing cross-only mean field'): self.mean_field_sim(idx) m.append(self.mean_field_sim(idx)) return np.mean(m, axis=0)
[docs] def mean_field_cl(self): fname = os.path.join( self.mdir, f"meanfield_cl_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}_fsky{self.filter.fsky:.2f}.pkl" ) if os.path.isfile(fname): return pl.load(open(fname, "rb")) else: mf = self.mean_field() mf_cl = cs.utils.alm2cl(self.recon_lmax, mf) / self.filter.fsky pl.dump(mf_cl, open(fname, "wb")) return mf_cl
[docs] def qcl_cross_only(self, idx, splits=(1,2,3,4)): """ Cross-only lensing power estimate: average over cross-spectra of reconstructions from disjoint split pairs. For 4 splits, uses the 3 disjoint pairings. """ fname = os.path.join( self.recdir, f"qcl_crossonly_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}_fsky{self.filter.fsky:.2f}_{idx:04d}.pkl" ) if os.path.isfile(fname): return pl.load(open(fname, "rb")) else: s1, s2, s3, s4 = splits # three disjoint pairings pairings = [ ((s1, s2), (s3, s4)), ((s1, s3), (s2, s4)), ((s1, s4), (s2, s3)), ] cls = [] for (i, j), (k, l) in pairings: phi_ij = self.qlm_pair(idx, i, j) phi_kl = self.qlm_pair(idx, k, l) # cross-spectrum of the two phi maps cl = cs.utils.alm2cl(self.recon_lmax, phi_ij, phi_kl) / self.filter.fsky cls.append(cl) pl.dump(np.mean(cls, axis=0), open(fname, "wb")) return np.mean(cls, axis=0)
[docs] def Nlens(self,binned=False): """ Lensing bias: average qcl on null_index minus MCN0('vary') Nlens = <qcl(null_alpha)> - MCN0('vary') """ fname = os.path.join(self.basedir, f'Nlens_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): nlens = pl.load(open(fname,'rb')) else: qcl_list = [] for idx in tqdm(self.null_index, desc='Computing Nlens'): # Get qcl without any bias subtraction qcl_list.append(self.qcl_cross_only(idx)) avg_qcl = np.array(qcl_list).mean(axis=0) nlens = avg_qcl - self.MCN0('stat') pl.dump(nlens, open(fname,'wb')) if binned: bnlen = self.binner.bin_cell(nlens[:self.lmax_bin+1]) return bnlen else: return nlens
[docs] def N1(self,binned=False): """ N1 bias: difference between MCN0 for constant and varying alpha N1 = MCN0('const') - MCN0('vary') """ fname = os.path.join(self.basedir, f'N1_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): n1 = pl.load(open(fname,'rb')) else: n1 = self.MCN0('const') - self.MCN0('vary') pl.dump(n1, open(fname,'wb')) if binned: bn1 = self.binner.bin_cell(n1[:self.lmax_bin+1]) return bn1 else: return n1
[docs] def qcl_stat(self, n0=None, n1=False, mf=False, nlens=False, binned=False): """ Compute statistics of qcl over stat_sims with various bias corrections. Parameters ---------- n0 : str or None N0 bias correction: None (no correction), 'mcn0', or 'rdn0' mf : bool If True, subtract mean field bias nlens : bool If True, subtract lensing bias binned : bool If True, return binned spectra Returns ------- cl : array Array of shape (nsims, nlmax+1) or (nsims, nbins) if binned """ st = '' if n0 is None: st += '_noN0' elif n0 == 'mcn0': st += '_n0mcn0' elif n0 == 'rdn0': st += '_n0rdn0' else: raise ValueError("n0 must be None, 'mcn0', or 'rdn0'") if n1: st += '_n1' if mf: st += '_mf' if nlens: st += '_nlens' fname = os.path.join( self.basedir, f'qcl_crossonly_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}_nlb{self.nlb}_{st}_{binned}.pkl' ) if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: # Determine bias corrections if n0 is None: N0 = np.zeros(self.recon_lmax + 1) elif n0 == 'mcn0': N0 = self.MCN0() elif n0 == 'rdn0': # For cross-only, RDN0 should be computed per simulation # This will be handled in the loop below N0 = None if n1: N1 = self.N1(binned=False) if mf: MF = self.mean_field_cl() else: MF = np.zeros(self.recon_lmax + 1) if nlens: NLENS = self.Nlens(binned=False) else: NLENS = np.zeros(self.recon_lmax + 1) cl = [] for i in tqdm(self.stat_index, desc='Computing cross-only cl statistics', unit='sim'): qcl_i = self.qcl_cross_only(i, splits=(1,2,3,4)) # Apply RDN0 correction if requested if n0 == 'rdn0': N0_i = self.RDN0(i) else: N0_i = N0 # Apply all bias corrections qcl_i = qcl_i - N0_i - MF - NLENS if binned: qcl_i = self.binner.bin_cell(qcl_i[:self.lmax_bin+1]) cl.append(qcl_i) cl = np.array(cl) pl.dump(cl, open(fname, 'wb')) return cl
def _cl_cross_only_between(self, A: dict, B: dict): """ Cross-only spectrum for 4 splits: (12×34 + 13×24 + 14×23)/3 where A[(i,j)] are recon alms from split pair (i,j). """ c12_34 = cs.utils.alm2cl(self.recon_lmax, A[(1,2)], B[(3,4)]) / self.filter.fsky c13_24 = cs.utils.alm2cl(self.recon_lmax, A[(1,3)], B[(2,4)]) / self.filter.fsky c14_23 = cs.utils.alm2cl(self.recon_lmax, A[(1,4)], B[(2,3)]) / self.filter.fsky return (c12_34 + c13_24 + c14_23) / 3.0 def _qlm_pair_tag_sym(self, d_idx: int, s_idx: int, s0_idx: int, si: int, sj: int, tagE: str, tagB: str): """ Symmetric EB estimator (lensing-like), compatible with qlm_pair(): 0.5 * [ Q(E^si, B^sj) + Q(E^sj, B^si) ] with direction-dependent norms. tagE/tagB in {'d','s','s0'} select which realization supplies E and B legs. """ assert si != sj def pick(tag: str) -> int: if tag == "d": return d_idx if tag == "s": return s_idx if tag == "s0": return s0_idx raise ValueError("tag must be one of {'d','s','s0'}") idxE = pick(tagE) idxB = pick(tagB) # direction-dependent norms (same logic as qlm_pair) norms = self.precompute_pair_norms(pairs=((1,2),(3,4),(1,3),(2,4),(1,4),(2,3))) i, j = (si, sj) if si < sj else (sj, si) if (si, sj) == (i, j): n_EiBj = norms[(i, j)]["EiBj"] n_EjBi = norms[(i, j)]["EjBi"] else: n_EiBj = norms[(i, j)]["EjBi"] n_EjBi = norms[(i, j)]["EiBj"] # Pull E from idxE, B from idxB, with the requested split indices E_si, _ = self.filter.cinv_EB(idxE, split=si) E_sj, _ = self.filter.cinv_EB(idxE, split=sj) _, B_si = self.filter.cinv_EB(idxB, split=si) _, B_sj = self.filter.cinv_EB(idxB, split=sj) # Q(E_si, B_sj) alm_ij = cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.cl_len[1, :self.lmax+1], E_si[:self.lmax+1, :self.lmax+1], B_sj[:self.lmax+1, :self.lmax+1] ) # Q(E_sj, B_si) alm_ji = cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.cl_len[1, :self.lmax+1], E_sj[:self.lmax+1, :self.lmax+1], B_si[:self.lmax+1, :self.lmax+1] ) # SYM: symmetric combination + direction norms phi = 0.5 * (alm_ij * n_EiBj[:, None] + alm_ji * n_EjBi[:, None]) return phi def _build_six_sym(self, d_idx: int, s_idx: int, s0_idx: int, tagE: str, tagB: str): """ Build the six split-pair reconstructions (12,34,13,24,14,23) using the symmetric estimator. """ pairs = [(1,2),(3,4),(1,3),(2,4),(1,4),(2,3)] out = {} for (i, j) in pairs: out[(i, j)] = self._qlm_pair_tag_sym(d_idx, s_idx, s0_idx, i, j, tagE, tagB) return out
[docs] def N0_sim(self,idx,which='vary',debug=False): """ Calculate the N0 bias from the Reconstructed potential using filtered Fields with different CMB fields. If E modes is from ith simulation then B modes is from (i+1)th simulation idx: int : index of the N0 which: str : 'stat', 'vary', or 'const' to select which index range to use debug: bool : if True, print the indices used for computation and return None Index ranges and wrapping: - 'stat': stat_index, wraps within stat range - 'vary': vary_index, wraps within vary range - 'const': const_index, wraps within const range Requires sim_config to be set, or manually set the corresponding index array. """ if which == 'stat': index_range = self.stat_index label = 'stat' elif which == 'vary': index_range = self.vary_index label = 'vary' elif which == 'const': index_range = self.const_index label = 'const' else: raise ValueError("which must be 'stat', 'vary', or 'const'") assert idx in index_range, f"The requested idx {idx} is not in the {which} index range" # Simple increment with wrapping within the index_range bounds idx1 = idx min_idx = min(index_range) max_idx = max(index_range) # If at the end of range, wrap to beginning of range if idx == max_idx: idx2 = min_idx else: idx2 = idx + 1 if debug: print(f"N0_sim debug mode:") print(f" which: {which}") print(f" index_range: [{min_idx}, {max_idx}]") print(f" idx1 (E1B1): {idx1}") print(f" idx2 (E2B2): {idx2}") return None # choose (a,b) as consecutive sims, like your lensing codeims[0],self.stat_sims[1]), (0,1), mode="wrap") # pad/wrap to avoid idx+1 overflo a = idx1 b = idx2 fname = os.path.join(self.n0dir, f"N0_{label}_{self.filter.fsky:.2f}_{idx:04d}.pkl") if os.path.isfile(fname): return pl.load(open(fname, "rb")) # Build six recon alms for each split-pair: # ab = Q(E_a, B_b), ba = Q(E_b, B_a) # We can use your (d_idx, s_idx, s0_idx) interface with tags: # tagE="s", tagB="s0" -> E from s_idx, B from s0_idx # tagE="s0", tagB="s" -> E from s0_idx, B from s_idx ab = self._build_six_sym(d_idx=0, s_idx=a, s0_idx=b, tagE="s", tagB="s0") ba = self._build_six_sym(d_idx=0, s_idx=a, s0_idx=b, tagE="s0", tagB="s") # Cross-only operator already divides by fsky internally. n0 = self._cl_cross_only_between(ab, ab) + self._cl_cross_only_between(ab, ba) pl.dump(n0, open(fname, "wb")) return n0
[docs] def MCN0(self, which='vary'): fname = os.path.join(self.basedir, f'MCN0_{which}_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: if which == 'stat': index = self.stat_index elif which == 'vary': index = self.vary_index elif which == 'const': index = self.const_index else: raise ValueError("which must be 'stat', 'vary', or 'const'") n0_list = [] for idx in tqdm(index, desc=f'Computing MCN0 ({which})'): n0_list.append(self.N0_sim(idx, which=which)) mcn0 = np.array(n0_list).mean(axis=0) pl.dump(mcn0, open(fname,'wb')) return mcn0
[docs] def RDN0(self, idx: int, navg: int = 100): """ Realization-dependent N0 (RDN0) compatible with qcl_cross_only() (symmetric estimator). Uses the same Eq.(43)-style structure: RDN0 = Cx(ds,ds) + Cx(ds,sd) + Cx(sd,ds) + Cx(sd,sd) - Cx(ss0,ss0) - Cx(ss0,s0s) where Cx is the cross-only spectrum operator (12×34+13×24+14×23)/3. Returns: array C_L[0..recon_lmax] """ fsky_tag = f"{self.filter.fsky:.2f}".replace('.', 'p') fname = os.path.join(self.n0dir, f"RDN0x_{fsky_tag}_{idx:04d}_navg{navg}.pkl") if os.path.isfile(fname): return pl.load(open(fname, "rb")) # pool of sim indices excluding idx myidx = np.append(np.arange(100), np.arange(2)) myidx = np.delete(myidx, np.where(myidx == idx)[0]) d_idx = idx acc = np.zeros(self.recon_lmax + 1, dtype=np.float64) nused = 0 for i in tqdm(range(navg), desc=f"RDN0x for sim {idx}", unit="mc"): s_idx = int(myidx[i]) s0_idx = int(myidx[i+1]) # Build the needed recon sets ds = self._build_six_sym(d_idx, s_idx, s0_idx, tagE="d", tagB="s") sd = self._build_six_sym(d_idx, s_idx, s0_idx, tagE="s", tagB="d") ss0 = self._build_six_sym(d_idx, s_idx, s0_idx, tagE="s", tagB="s0") s0s = self._build_six_sym(d_idx, s_idx, s0_idx, tagE="s0", tagB="s") cl_ds_ds = self._cl_cross_only_between(ds, ds) cl_ds_sd = self._cl_cross_only_between(ds, sd) cl_sd_ds = self._cl_cross_only_between(sd, ds) cl_sd_sd = self._cl_cross_only_between(sd, sd) cl_ss0_ss0 = self._cl_cross_only_between(ss0, ss0) cl_ss0_s0s = self._cl_cross_only_between(ss0, s0s) rdn0_i = (cl_ds_ds + cl_ds_sd + cl_sd_ds + cl_sd_sd - cl_ss0_ss0 - cl_ss0_s0s) acc += rdn0_i nused += 1 rdn0 = acc / max(nused, 1) pl.dump(rdn0, open(fname, "wb")) return rdn0
[docs] def RDN0_mpi(self, idx: int, navg: int = 100): """ MPI-parallel cross-only RDN0 for SYM estimator (compatible with qcl_cross_only()). Implements the Eq.(43)-style combination (same algebra as your rot version), but with SYM estimator: RDN0 = Cx(ds,ds)+Cx(ds,sd)+Cx(sd,ds)+Cx(sd,sd) - Cx(ss0,ss0) - Cx(ss0,s0s) Parallelizes navg Monte-Carlo draws over (s, s0) pairs. Only rank 0 reads/writes the cache. """ MPI = mpi.mpi comm = mpi.com rank = mpi.rank size = mpi.size fsky_tag = f"{self.filter.fsky:.2f}".replace('.', 'p') fname = os.path.join(self.n0dir, f"RDN0x_{fsky_tag}_{idx:04d}_navg{navg}.pkl") # 1) Try to load cached result on rank 0, then broadcast. rdn0 = None if rank == 0 and os.path.isfile(fname): with open(fname, "rb") as f: rdn0 = pl.load(f) rdn0 = comm.bcast(rdn0, root=0) if rdn0 is not None: return rdn0 # 2) Build deterministic index pool excluding idx (same trick). nsim = 100 # or self.sim_config.nsim if available and consistent myidx = np.append(np.arange(nsim), np.arange(2)) myidx = np.delete(myidx, np.where(myidx == idx)[0]) # 3) Distribute tasks. tasks = np.arange(navg, dtype=int) chunks = np.array_split(tasks, size) my_tasks = chunks[rank] L = self.recon_lmax + 1 local_sum = np.zeros(L, dtype=np.float64) # 4) One MC draw. def _one_draw(i: int): s_idx = int(myidx[i % len(myidx)]) s0_idx = int(myidx[(i + 1) % len(myidx)]) d_idx = idx # Build the mixed estimators needed in Eq.(43) ds = self._build_six_sym(d_idx, s_idx, s0_idx, tagE="d", tagB="s") sd = self._build_six_sym(d_idx, s_idx, s0_idx, tagE="s", tagB="d") ss0 = self._build_six_sym(d_idx, s_idx, s0_idx, tagE="s", tagB="s0") s0s = self._build_six_sym(d_idx, s_idx, s0_idx, tagE="s0", tagB="s") cl_ds_ds = self._cl_cross_only_between(ds, ds) cl_ds_sd = self._cl_cross_only_between(ds, sd) cl_sd_ds = self._cl_cross_only_between(sd, ds) cl_sd_sd = self._cl_cross_only_between(sd, sd) cl_ss0_ss0 = self._cl_cross_only_between(ss0, ss0) cl_ss0_s0s = self._cl_cross_only_between(ss0, s0s) return (cl_ds_ds + cl_ds_sd + cl_sd_ds + cl_sd_sd - cl_ss0_ss0 - cl_ss0_s0s) # 5) Local accumulation (progress only on rank 0). iterator = tqdm(my_tasks, desc=f'RDN0x(sym) rank {rank} for sim {idx}', unit='mc') if rank == 0 else my_tasks for t in iterator: local_sum += _one_draw(int(t)) # 6) Reduce to rank 0 and normalize by navg. global_sum = np.zeros_like(local_sum) comm.Reduce(local_sum, global_sum, op=MPI.SUM, root=0) if rank == 0: rdn0 = global_sum / float(len(tasks)) os.makedirs(self.n0dir, exist_ok=True) with open(fname, "wb") as f: pl.dump(rdn0, f) # 7) Broadcast to all ranks. rdn0 = comm.bcast(rdn0, root=0) return rdn0
[docs] def RDN0_stat(self,array=False): """ RDN0 for all stat_index simulations """ if array: fname = os.path.join(self.basedir, f'RDN0_stat_fsky{self.filter.fsky:.2f}_array{len(self.stat_index)}.pkl') else: fname = os.path.join(self.basedir, f'RDN0_stat_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): return pl.load(open(fname,'rb')) else: rdn0_list = [] for i in tqdm(self.stat_index, desc='Computing RDN0 statistics'): rdn0_list.append(self.RDN0(i)) if array: rdn0_array = np.array(rdn0_list) pl.dump(rdn0_array, open(fname,'wb')) return rdn0_array else: rdn0_array = np.array(rdn0_list).mean(axis=0) pl.dump(rdn0_array, open(fname,'wb')) return rdn0_array
[docs] class CrossQEv1: """ Cross-only quadratic estimator for cosmic birefringence, implementing Eq. 38 of Madhavacheril et al. (2020). Requires 4 splits with independent noise. Uses the O(m^2) algorithm from Section 3.2 of the paper. """
[docs] def __init__(self, filter, lmin: int, lmax: int, recon_lmax: int, nsims_mf=100, nlb=10, lmax_bin=1024): assert filter.sky.nsplits == 4, "CrossQE requires 4 splits." self.basedir = os.path.join(filter.sky.libdir, 'qecross_v1') self.recdir = os.path.join(self.basedir, f'reco_min{lmin}_max{lmax}_rmax{recon_lmax}') self.n0dir = os.path.join(self.basedir, f'rdn0_min{lmin}_max{lmax}_rmax{recon_lmax}') self.mdir = os.path.join(self.basedir, f'misc_min{lmin}_max{lmax}_rmax{recon_lmax}') if mpi.rank == 0: os.makedirs(self.basedir, exist_ok=True) os.makedirs(self.recdir, exist_ok=True) os.makedirs(self.n0dir, exist_ok=True) os.makedirs(self.mdir, exist_ok=True) self.filter = filter self.lmin = lmin self.lmax = lmax self.recon_lmax = recon_lmax self.cl_len = filter.cl_len self.lmax_bin = lmax_bin self.m = 4 # number of splits self.sim_config = filter.sky.cmb.sim_config if self.sim_config is None: raise ValueError("sim_config must be set.") set1 = self.sim_config['set1'] reuse_last = self.sim_config['reuse_last'] self.stat_index = np.arange(0, set1 - nsims_mf) self.mf_index = np.arange(set1 - nsims_mf, set1) self.vary_index = np.arange(set1 - reuse_last, set1) self.const_index = np.arange(set1, set1 + reuse_last) self.null_index = np.arange(set1 + reuse_last, set1 + 2 * reuse_last) self.binner = nmt.NmtBin.from_lmax_linear(lmax_bin, nlb) self.b = self.binner.get_effective_ells() self.nlb = nlb # CHANGED: Use a SINGLE normalization for all pairs. # Since splits are filtered with per-split noise (Option B), we compute # the norm from the average of per-split observed spectra across all 4 splits. # This is an approximation — the exact paper prescription (Eq. 21) would # require re-filtering all splits with the coadd noise. # If split noise levels are roughly comparable, this is a good approximation. self.norm = self._compute_avg_split_norm()
@property def cl_aa(self): return self.filter.sky.cmb.cl_aa()[:self.recon_lmax + 1] def _compute_avg_split_norm(self): """ CHANGED: Compute a single normalization by averaging the per-split noise levels across all 4 splits. Since cinv_EB filters each split with its own noise, the effective total power seen by the filter for split s is: ocl_E^s = Cl_EE_lensed + <N_EE_split_s> ocl_B^s = Cl_BB_lensed + <N_BB_split_s> We average these across splits to get a single representative total power, then compute one normalization from that. This is the best single-norm approximation when per-split filtering is used but split noise levels are similar. """ fname = os.path.join( self.basedir, f'norm_avgsplit_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}.pkl' ) if os.path.isfile(fname): return pl.load(open(fname, 'rb')) splits = [1, 2, 3, 4] # Average noise over sims AND over splits ne_avg = np.zeros(self.lmax + 1, dtype=np.float64) nb_avg = np.zeros(self.lmax + 1, dtype=np.float64) count = 0 for s in splits: for i in tqdm(self.stat_index, desc=f'Computing OCL split={s}'): ne, nb = self.filter.sky.HILC_obsEB(i, ret='nl', split=s) ne_avg += ne[:self.lmax + 1] / Tcmb**2 nb_avg += nb[:self.lmax + 1] / Tcmb**2 count += 1 ne_avg /= count nb_avg /= count # Total observed spectra = lensed theory + average split noise oclE = self.cl_len[1, :self.lmax + 1] + ne_avg oclB = self.cl_len[2, :self.lmax + 1] + nb_avg norm = cs.norm_quad.qeb( 'rot', self.recon_lmax, self.lmin, self.lmax, self.cl_len[1, :self.lmax + 1], oclE, oclB )[0] pl.dump(norm, open(fname, 'wb')) return norm def _raw_qlm(self, E, B): """ Unnormalized QE reconstruction from a pair of filtered E, B alms. """ return cs.rec_rot.qeb( self.recon_lmax, self.lmin, self.lmax, self.cl_len[1, :self.lmax + 1], E[:self.lmax + 1, :self.lmax + 1], B[:self.lmax + 1, :self.lmax + 1] ) def _get_split_EB(self, idx, split): """Get C-inv filtered E, B for a given sim index and split number.""" return self.filter.cinv_EB(idx, split=split) def _phi_ij(self, idx, si, sj): """ CHANGED: Compute the symmetrized split-pair reconstruction phi^(ij) using Eq. 24 of the paper, with the SINGLE coadd normalization. phi^(ij) = (1/2) * A * [g(X_i, Y_j) + g(X_j, Y_i)] For birefringence EB estimator: phi^(ij) = (1/2) * norm * [Q(E_i, B_j) + Q(E_j, B_i)] For diagonal terms (si == sj): phi^(ii) = norm * Q(E_i, B_i) [the symmetrization is trivial] The original code used per-pair normalizations computed from per-split noise levels. This broke the combinatorial identity. """ Ei, Bi = self._get_split_EB(idx, si) if si == sj: alm = self._raw_qlm(Ei, Bi) return alm * self.norm[:, None] else: Ej, Bj = self._get_split_EB(idx, sj) alm_EiBj = self._raw_qlm(Ei, Bj) alm_EjBi = self._raw_qlm(Ej, Bi) return 0.5 * (alm_EiBj + alm_EjBi) * self.norm[:, None]
[docs] def four_split_phi(self, idx): """ CHANGED: Completely rewritten to follow the so-lenspipe implementation. Computes all the intermediate quantities needed for Eq. 38: - phi^(ij) for all i,j (including diagonals) - phi_hat = (1/m^2) sum_{ij} phi^(ij) (full coadd reconstruction) - phi^x = phi_hat - (1/m^2) sum_i phi^(ii) (cross coadd, no diags) - phi^(i) = (1/m) sum_j phi^(ij) (per-split marginals) - phi^(i)x = phi^(i) - (1/m) phi^(ii) (per-split cross) Returns a dict with all quantities needed by split_phi_to_cl. The original code only computed phi for disjoint pairs and averaged them at the map level, which does not implement Eq. 38. """ fname = os.path.join( self.recdir, f'four_split_phi_{self.filter.fsky:.2f}_{idx:04d}.pkl' ) if os.path.isfile(fname): return pl.load(open(fname, 'rb')) m = self.m splits = [1, 2, 3, 4] # Compute all 16 phi^(ij) — but use symmetry: phi^(ij) = phi^(ji) phi = {} for i in splits: for j in splits: if j < i: phi[(i, j)] = phi[(j, i)] # symmetry else: phi[(i, j)] = self._phi_ij(idx, i, j) # phi_hat = (1/m^2) * sum_{ij} phi^(ij) — the full coadd estimator phi_hat = sum(phi[(i, j)] for i in splits for j in splits) / m**2 # phi^x = phi_hat - (1/m^2) sum_i phi^(ii) — cross coadd phi_diag_sum = sum(phi[(i, i)] for i in splits) phi_X = phi_hat - phi_diag_sum / m**2 # phi^(i) = (1/m) sum_j phi^(ij) — per-split marginal phi_i = {} for i in splits: phi_i[i] = sum(phi[(i, j)] for j in splits) / m # phi^(i)x = phi^(i) - (1/m) phi^(ii) — per-split cross phi_ix = {} for i in splits: phi_ix[i] = phi_i[i] - phi[(i, i)] / m # Collect the unique cross-pair reconstructions for the tg3 term cross_pairs = {} for i in splits: for j in splits: if i < j: cross_pairs[(i, j)] = phi[(i, j)] result = { 'phi_X': phi_X, # cross coadd 'phi_ix': phi_ix, # dict: split -> per-split cross 'cross_pairs': cross_pairs, # dict: (i,j) -> phi^(ij) for i<j } pl.dump(result, open(fname, 'wb')) return result
[docs] def split_phi_to_cl(self, xy, uv=None): """ CHANGED: Completely new method implementing Eq. 38. Computes the cross-only power spectrum: C_L^x(XY, UV) = 1/[m(m-1)(m-2)(m-3)] * [ m^4 * C_L(phi_X, phi_X') - 4*m^2 * sum_i C_L(phi_ix, phi_ix') + 4 * sum_{i<j} C_L(phi_ij, phi_ij') ] This is the O(m^2) algorithm from Section 3.2 of the paper. Follows the so-lenspipe split_phi_to_cl function exactly. Args: xy: dict from four_split_phi (first reconstruction, e.g. data) uv: dict from four_split_phi (second reconstruction, for cross). If None, compute auto-spectrum. """ if uv is None: uv = xy m = self.m fsky = self.filter.fsky # Term 1: m^4 * C_L(phi^x, phi'^x) tg1 = m**4 * cs.utils.alm2cl(self.recon_lmax, xy['phi_X'], uv['phi_X']) / fsky # Term 2: -4*m^2 * sum_i C_L(phi^(i)x, phi'^(i)x) tg2 = np.zeros(self.recon_lmax + 1) for i in [1, 2, 3, 4]: tg2 += cs.utils.alm2cl(self.recon_lmax, xy['phi_ix'][i], uv['phi_ix'][i]) / fsky tg2 *= -4 * m**2 # Term 3: +4 * sum_{i<j} C_L(phi^(ij), phi'^(ij)) tg3 = np.zeros(self.recon_lmax + 1) for (i, j) in xy['cross_pairs']: tg3 += cs.utils.alm2cl(self.recon_lmax, xy['cross_pairs'][(i, j)], uv['cross_pairs'][(i, j)]) / fsky tg3 *= 4 # Eq. 38 cl_cross = (tg1 + tg2 + tg3) / (m * (m - 1) * (m - 2) * (m - 3)) return cl_cross
[docs] def qlm(self, idx): """ CHANGED: This now returns the cross-coadd map phi^x for mean-field etc. Note: this is biased by (1 - 1/m), but that's fine for mean-field estimation where we average and subtract. """ result = self.four_split_phi(idx) return result['phi_X']
[docs] def qcl(self, idx, n0=None, mf=False, nlens=False, n1=False, binned=False): """ CHANGED: The power spectrum is now computed via split_phi_to_cl (Eq. 38) rather than by taking the auto-spectrum of an averaged map. """ result = self.four_split_phi(idx) cl = self.split_phi_to_cl(result) if n0 is None: N0 = np.zeros_like(cl) elif n0 == 'norm': N0 = self.norm elif n0 == 'mcn0': N0 = self.MCN0('stat') elif n0 == 'rdn0': N0 = self.RDN0(idx) else: raise ValueError("n0 must be 'norm', 'rdn0', or 'mcn0'") cl = cl - N0 if mf: cl = cl - self.mean_field_cl() if nlens: cl = cl - self.Nlens() if n1: cl = cl - self.N1() if binned: return self.binner.bin_cell(cl[:self.lmax_bin + 1]) return cl
[docs] def mean_field_sim(self, idx): """ CHANGED: Mean field per-sim is now just the cross-coadd map phi^x. The original averaged phi over 3 disjoint pairings at the map level, which is not the correct cross-only mean field. """ fname = os.path.join( self.mdir, f"mean_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}" f"_fsky{self.filter.fsky:.2f}_{idx:04d}.pkl" ) if os.path.isfile(fname): return pl.load(open(fname, "rb")) phi_X = self.qlm(idx) pl.dump(phi_X, open(fname, "wb")) return phi_X
[docs] def mean_field(self): """Mean field: average of phi^x over mf_index sims.""" m = [] for idx in tqdm(self.mf_index, desc='Computing cross-only mean field'): m.append(self.mean_field_sim(idx)) return np.mean(m, axis=0)
[docs] def mean_field_cl(self): fname = os.path.join( self.mdir, f"meanfield_cl_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}" f"_fsky{self.filter.fsky:.2f}.pkl" ) if os.path.isfile(fname): return pl.load(open(fname, 'rb')) mf = self.mean_field() mfcl = cs.utils.alm2cl(self.recon_lmax, mf) / self.filter.fsky pl.dump(mfcl, open(fname, 'wb')) return mfcl
[docs] def N0_sim(self, idx, which='vary'): """ CHANGED: N0 for the cross-only estimator. Uses Eq. 38 applied to reconstructions where E and B come from DIFFERENT simulations (to get disconnected/Gaussian piece only). For each split pair (i,j), the symmetrized estimator is: phi^(ij) = (1/2) * norm * [Q(E_i^sim1, B_j^sim2) + Q(E_j^sim1, B_i^sim2)] Note: for i != j in the data case, we had 4 terms for full symmetrization over both split indices AND E/B leg assignment. For N0 with two sims, the symmetrization is: swap which sim provides E vs B for each split. """ if which == 'stat': index_range = self.stat_index label = 'stat' elif which == 'vary': index_range = self.vary_index label = 'vary' elif which == 'const': index_range = self.const_index label = 'const' else: raise ValueError("which must be 'stat', 'vary', or 'const'") assert idx in index_range min_idx, max_idx = min(index_range), max(index_range) idx1 = idx idx2 = min_idx if idx == max_idx else idx + 1 fname = os.path.join(self.n0dir, f"N0_{label}_{self.filter.fsky:.2f}_{idx:04d}.pkl") if os.path.isfile(fname): return pl.load(open(fname, 'rb')) m = self.m splits = [1, 2, 3, 4] # Pre-cache all split E,B for both sims EB1 = {} # sim idx1 EB2 = {} # sim idx2 for s in splits: EB1[s] = self._get_split_EB(idx1, s) # (E, B) tuple EB2[s] = self._get_split_EB(idx2, s) # Build phi^(ij) from E(sim1) x B(sim2) symmetrized over E<->B assignment # phi^(ij) = 0.5 * norm * [Q(E_i^1, B_j^2) + Q(E_j^2, B_i^1)] # For i != j, also need the (i,j) swap: # phi^(ij) = 0.25 * norm * [Q(E_i^1, B_j^2) + Q(E_j^2, B_i^1) # + Q(E_j^1, B_i^2) + Q(E_i^2, B_j^1)] phi = {} for i in splits: for j in splits: if j < i: phi[(i, j)] = phi[(j, i)] # symmetry continue Ei_1, Bi_1 = EB1[i] Ej_2, Bj_2 = EB2[j] alm_a = self._raw_qlm(Ei_1, Bj_2) # E_i^sim1, B_j^sim2 alm_b = self._raw_qlm(Ej_2, Bi_1) # E_j^sim2, B_i^sim1 if i == j: phi[(i, j)] = 0.5 * (alm_a + alm_b) * self.norm[:, None] else: Ej_1, Bj_1 = EB1[j] Ei_2, Bi_2 = EB2[i] alm_c = self._raw_qlm(Ej_1, Bi_2) # E_j^sim1, B_i^sim2 alm_d = self._raw_qlm(Ei_2, Bj_1) # E_i^sim2, B_j^sim1 phi[(i, j)] = 0.25 * (alm_a + alm_b + alm_c + alm_d) * self.norm[:, None] # Assemble intermediate quantities (same as four_split_phi) phi_hat = sum(phi[(i, j)] for i in splits for j in splits) / m**2 phi_diag_sum = sum(phi[(i, i)] for i in splits) phi_X = phi_hat - phi_diag_sum / m**2 phi_ix = {} for i in splits: phi_i = sum(phi[(i, j)] for j in splits) / m phi_ix[i] = phi_i - phi[(i, i)] / m cross_pairs = {} for i in splits: for j in splits: if i < j: cross_pairs[(i, j)] = phi[(i, j)] n0_result = { 'phi_X': phi_X, 'phi_ix': phi_ix, 'cross_pairs': cross_pairs, } n0cl = self.split_phi_to_cl(n0_result) pl.dump(n0cl, open(fname, 'wb')) return n0cl
[docs] def MCN0(self, which='vary'): """Monte Carlo N0: average N0_sim over the relevant index range.""" fname = os.path.join(self.basedir, f'MCN0_cross_{which}_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): return pl.load(open(fname, 'rb')) if which == 'stat': index = self.stat_index elif which == 'vary': index = self.vary_index elif which == 'const': index = self.const_index else: raise ValueError("which must be 'stat', 'vary', or 'const'") n0_list = [] for idx in tqdm(index, desc=f'Computing cross MCN0 ({which})'): n0_list.append(self.N0_sim(idx, which=which)) mcn0 = np.array(n0_list).mean(axis=0) pl.dump(mcn0, open(fname, 'wb')) return mcn0
[docs] def N1(self, binned=False): """N1 = MCN0('const') - MCN0('vary')""" fname = os.path.join(self.basedir, f'N1_cross_fsky{self.filter.fsky:.2f}.pkl') if os.path.isfile(fname): n1 = pl.load(open(fname, 'rb')) else: n1 = self.MCN0('const') - self.MCN0('vary') pl.dump(n1, open(fname, 'wb')) if binned: return self.binner.bin_cell(n1[:self.lmax_bin + 1]) return n1
[docs] def Nlens(self, MCN0=True, binned=False): """Lensing bias from null sims.""" fname = os.path.join(self.basedir, f'Nlens_cross_fsky{self.filter.fsky:.2f}_mcn0{MCN0}.pkl') if os.path.isfile(fname): nlens = pl.load(open(fname, 'rb')) else: qcl_list = [] for idx in tqdm(self.null_index, desc='Computing cross Nlens'): result = self.four_split_phi(idx) qcl_list.append(self.split_phi_to_cl(result)) avg_qcl = np.array(qcl_list).mean(axis=0) if MCN0: nlens = avg_qcl - self.MCN0('vary') else: nlens = avg_qcl - self.norm pl.dump(nlens, open(fname, 'wb')) if binned: return self.binner.bin_cell(nlens[:self.lmax_bin + 1]) return nlens
[docs] def RDN0(self, idx): """ CHANGED: Realization-dependent N0 for the cross-only estimator. Implements Eq. 43 of the paper: apply the C^x algorithm (Eq. 38) to each of the 4-point data/sim combinations from the standard RDN0 formula (Eq. 17), then average over sim realizations. For each MC iteration with sims s1, s2: RDN0 = <C^x(d,s1) + C^x(s1,d) - C^x(s1,s2) - C^x(s2,s1)> where C^x(a,b) means: E legs from 'a', B legs from 'b'. """ fname = os.path.join(self.n0dir, f"RDN0_cross_{self.filter.fsky:.2f}_{idx:04d}.pkl") if os.path.isfile(fname): return pl.load(open(fname, 'rb')) myidx = self.stat_index.copy() m = self.m splits = [1, 2, 3, 4] # Get data splits data_EB = {} for s in splits: data_EB[s] = self._get_split_EB(idx, s) mean_rdn0 = [] for mc in tqdm(range(100), desc=f'RDN0 cross for sim {idx}'): i1 = myidx[mc % len(myidx)] i2 = myidx[(mc + 1) % len(myidx)] sim1_EB = {} sim2_EB = {} for s in splits: sim1_EB[s] = self._get_split_EB(i1, s) sim2_EB[s] = self._get_split_EB(i2, s) # Build 4 sets of phi^(ij): (d,s1), (s1,d), (s1,s2), (s2,s1) # For each, we need E from first, B from second def build_split_result(E_source, B_source): phi = {} for si in splits: Ei = E_source[si][0] # E alm for sj in splits: if sj < si: phi[(si, sj)] = phi[(sj, si)] continue Bj = B_source[sj][1] # B alm alm_a = self._raw_qlm(Ei, Bj) if si == sj: Ej = E_source[sj][0] Bi = B_source[si][1] alm_b = self._raw_qlm(Ej, Bi) phi[(si, sj)] = 0.5 * (alm_a + alm_b) * self.norm[:, None] else: Ej = E_source[sj][0] Bi = B_source[si][1] alm_b = self._raw_qlm(Ej, Bi) phi[(si, sj)] = 0.5 * (alm_a + alm_b) * self.norm[:, None] # Assemble phi_hat = sum(phi[(si, sj)] for si in splits for sj in splits) / m**2 phi_diag = sum(phi[(si, si)] for si in splits) phi_X = phi_hat - phi_diag / m**2 phi_ix = {} for si in splits: phi_si = sum(phi[(si, sj)] for sj in splits) / m phi_ix[si] = phi_si - phi[(si, si)] / m cross_pairs = {} for si in splits: for sj in splits: if si < sj: cross_pairs[(si, sj)] = phi[(si, sj)] return {'phi_X': phi_X, 'phi_ix': phi_ix, 'cross_pairs': cross_pairs} # (d, s1): E from data, B from sim1 res_ds1 = build_split_result( {s: (data_EB[s][0], data_EB[s][1]) for s in splits}, {s: (sim1_EB[s][0], sim1_EB[s][1]) for s in splits} ) # (s1, d): E from sim1, B from data res_s1d = build_split_result( {s: (sim1_EB[s][0], sim1_EB[s][1]) for s in splits}, {s: (data_EB[s][0], data_EB[s][1]) for s in splits} ) # (s1, s2): E from sim1, B from sim2 res_s1s2 = build_split_result( {s: (sim1_EB[s][0], sim1_EB[s][1]) for s in splits}, {s: (sim2_EB[s][0], sim2_EB[s][1]) for s in splits} ) # (s2, s1): E from sim2, B from sim1 res_s2s1 = build_split_result( {s: (sim2_EB[s][0], sim2_EB[s][1]) for s in splits}, {s: (sim1_EB[s][0], sim1_EB[s][1]) for s in splits} ) # RDN0 per iteration: Eq. 43 cl_ds1 = self.split_phi_to_cl(res_ds1) cl_s1d = self.split_phi_to_cl(res_s1d) cl_s1s2 = self.split_phi_to_cl(res_s1s2) cl_s2s1 = self.split_phi_to_cl(res_s2s1) mean_rdn0.append(cl_ds1 + cl_s1d - cl_s1s2 - cl_s2s1) rdn0 = np.mean(mean_rdn0, axis=0) pl.dump(rdn0, open(fname, 'wb')) return rdn0
[docs] def qcl_stat(self, n0=None, mf=False, nlens=False, n1=False, binned=True): """Power spectra for all stat_index simulations.""" st = '' if n0 is None: st += '_noN0' elif n0 == 'norm': st += '_n0anal' elif n0 == 'rdn0': st += '_n0rdn0' elif n0 == 'mcn0': st += '_n0mcn0' else: raise ValueError("n0 must be 'norm', 'rdn0', or 'mcn0'") if mf: st += '_mf' if nlens: st += '_nlens' if n1: st += '_n1' fname = os.path.join( self.basedir, f'qcl_cross_min{self.lmin}_max{self.lmax}_rmax{self.recon_lmax}' f'_nlb{self.nlb}_{st}_{binned}.pkl' ) if os.path.isfile(fname): return pl.load(open(fname, 'rb')) cl = [] for i in tqdm(self.stat_index, desc='Computing cross cl statistics'): cl.append(self.qcl(i, n0=n0, mf=mf, nlens=nlens, n1=n1, binned=binned)) cl = np.array(cl) pl.dump(cl, open(fname, 'wb')) return cl