import numpy as np
import healpy as hp
import os
import pickle as pl
from cobi.simulation import CMB
from cobi import mpi
from cobi.utils import Logger
from typing import Dict, Any, Tuple,Optional, List
RAD2ARCMIN = 180 * 60 / np.pi
[docs]
def bin_from_edges(start: np.ndarray, end: np.ndarray) -> Tuple:
lmax = np.amax(end) - 1 if len(end) > 0 else 0
ells, bpws, weights = [], [], []
for ib, (li, le) in enumerate(zip(start, end)):
nlb = int(le - li)
ells.extend(range(li, le))
bpws.extend([ib] * nlb)
weights.extend([1.0 / nlb] * nlb)
ells, bpws, weights = np.array(ells), np.array(bpws), np.array(weights)
n_bands = bpws[-1] + 1 if len(bpws) > 0 else 0
nell_array = np.zeros(n_bands, dtype=int)
valid_indices = (ells <= lmax) & (bpws >= 0)
unique_bpws, counts = np.unique(bpws[valid_indices], return_counts=True)
if len(unique_bpws) > 0:
nell_array[unique_bpws] = counts
ell_list = [np.zeros(n, dtype=int) for n in nell_array]
w_list = [np.zeros(n, dtype=np.float64) for n in nell_array]
counts_so_far = np.zeros(n_bands, dtype=int)
for i in range(len(ells)):
if valid_indices[i]:
b = bpws[i]
idx = counts_so_far[b]
ell_list[b][idx] = ells[i]
w_list[b][idx] = weights[i]
counts_so_far[b] += 1
for i in range(n_bands):
norm = np.sum(w_list[i])
if norm > 0:
w_list[i] /= norm
return n_bands, nell_array, ell_list, w_list
[docs]
def bin_configuration(info: Tuple) -> Tuple:
n_bands, nell_array, ell_list, w_list = info
max_nell = np.max(nell_array) if len(nell_array) > 0 else 0
ib_grid, il_grid = np.meshgrid(np.arange(n_bands,dtype=int), np.arange(max_nell,dtype=int), indexing='ij')
w_array = np.array([np.pad(l, (0, max_nell - len(l))) for l in w_list])
ell_array = np.array([np.pad(l, (0, max_nell - len(l)), 'constant', constant_values=-1) for l in ell_list], dtype=int)
return ib_grid, il_grid, w_array, ell_array
[docs]
def bin_spec_matrix(spec: np.ndarray, info: Tuple) -> np.ndarray:
(ib_grid, il_grid, w_array, ell_array) = info
n_bands = ib_grid.shape[0]
binned_spec = np.zeros((spec.shape[0], spec.shape[1], n_bands))
for b in range(n_bands):
valid_ells_mask = ell_array[b, :] >= 0
ells_in_bin = ell_array[b, valid_ells_mask]
weights_in_bin = w_array[b, valid_ells_mask]
if len(ells_in_bin) > 0:
binned_spec[:, :, b] = np.sum(weights_in_bin * spec[:, :, ells_in_bin], axis=2)
return binned_spec
[docs]
def bin_cov_matrix(cov: np.ndarray, info: Tuple) -> np.ndarray:
(ib_grid, il_grid, w_array, ell_array) = info
n_bands = ib_grid.shape[0]
binned_cov = np.zeros((cov.shape[0], cov.shape[1], cov.shape[2], n_bands))
for b in range(n_bands):
valid_ells_mask = ell_array[b, :] >= 0
ells_in_bin = ell_array[b, valid_ells_mask]
weights_in_bin = w_array[b, valid_ells_mask]
if len(ells_in_bin) > 0:
binned_cov[:, :, :, b] = np.sum(weights_in_bin**2 * cov[:, :, :, ells_in_bin], axis=3)
return binned_cov
[docs]
class _Result:
_PARAM_CONFIG = {
"alpha": {}, "Ad + alpha": {'Ad': 1.0}, "beta + alpha": {'beta': 0.0},
"As + Ad + alpha": {'As': 1.0, 'Ad': 1.0}, "Ad + beta + alpha": {'Ad': 1.0, 'beta': 0.0},
"As + Ad + beta + alpha": {'As': 1.0, 'Ad': 1.0, 'beta': 0.0},
"As + Asd + Ad + alpha": {'As': 1.0, 'Asd': 1.0, 'Ad': 1.0},
"As + Asd + Ad + beta + alpha": {'As': 1.0, 'Asd': 1.0, 'Ad': 1.0, 'beta': 0.0}
}
[docs]
def __init__(self, spec, fit, sim_idx, alpha_per_split, rm_same_tube,
binwidth, bmin, bmax, bands, freqs):
self.specdir, self.nside, self.fsky = spec.lat.libdir, spec.nside, spec.fsky
self.temp_bp, self.aposcale, self.CO, self.PS, self.pureB = spec.temp_bp, spec.aposcale, spec.CO, spec.PS, spec.pureB
self.sim_idx = sim_idx
self.nlb, self.bmin, self.bmax = binwidth, bmin, bmax
self.fit, self.rm_same_tube, self.alpha_per_split = fit, rm_same_tube, alpha_per_split
self.ml, self.std_fisher, self.cov_fisher = {}, {}, {"Iter 0": None}
initial_params = self._PARAM_CONFIG.get(fit, {})
self.ml["Iter 0"] = initial_params.copy()
self.std_fisher["Iter 0"] = {key: None for key in initial_params}
self.variables = list(initial_params.keys())
if alpha_per_split:
self.Nalpha, self.alpha_keys = len(bands), bands
else:
self.Nalpha, self.alpha_keys = len(freqs), freqs
for key in self.alpha_keys:
self.ml["Iter 0"][str(key)] = 0.0
self.std_fisher["Iter 0"][str(key)] = None
self.variables.extend([str(k) for k in self.alpha_keys])
self.variables = ", ".join(self.variables)
self.ext_par = len(initial_params)
self.Nvar = self.Nalpha + self.ext_par
[docs]
class MLE:
FIT_OPTIONS = ["alpha", "Ad + alpha", "As + Ad + alpha", "As + Asd + Ad + alpha",
"beta + alpha", "Ad + beta + alpha", "As + Ad + beta + alpha", "As + Asd + Ad + beta + alpha"]
[docs]
def __init__(self, libdir, spec_lib, fit, alpha_per_split=False, rm_same_tube=False,
binwidth=20, bmin=51, bmax=1000, verbose=True,
avoid_spectra: Optional[List[str]] = None):
if fit not in self.FIT_OPTIONS: raise ValueError(f"Invalid fit option. Choose from: {self.FIT_OPTIONS}")
self.logger = Logger(self.__class__.__name__, verbose)
self.niter_max = 100; self.tol = 0.5; self.spec = spec_lib; self.fit = fit
self.alpha_per_split = alpha_per_split; self.rm_same_tube = rm_same_tube
self.nside = self.spec.nside; self.fsky = self.spec.fsky
self.avoid_spectra = [str(s) for s in avoid_spectra] if avoid_spectra else None
if self.avoid_spectra:
avoid_set = set(self.avoid_spectra)
self.logger.log(f"Avoiding frequency channels: {avoid_set}", 'info')
# The spec_lib.bands are strings like '27-1', freqs are numbers like 27
self.bands = [b for b in self.spec.bands if b.split('-')[0] not in avoid_set]
self.freqs = [f for f in self.spec.freqs if str(f) not in avoid_set]
else:
self.bands = self.spec.bands
self.freqs = self.spec.freqs
self.Nbands = len(self.bands)
self.Nfreq = len(self.freqs)
if self.Nbands == 0: raise ValueError("All frequency channels have been filtered out.")
fld_ext = f"{spec_lib.dust_model}{spec_lib.sync_model}" if spec_lib.lat.dust_model != spec_lib.dust_model else ""
self.libdir = os.path.join(spec_lib.lat.libdir, 'mle' + fld_ext)
if mpi.rank == 0: os.makedirs(self.libdir, exist_ok=True)
mpi.barrier()
self.cmb = CMB(libdir, self.nside, beta=0, model='iso')
self.cmb_cls = self.cmb.get_lensed_spectra(dl=False, dtype='d')
assert bmax <= self.spec.lmax, "bmax must be <= lmax"
self.nlb, self.bmin, self.bmax = binwidth, bmin, bmax
lower = np.arange(self.bmin, self.bmax - self.nlb, self.nlb)
upper = np.arange(self.bmin + self.nlb, self.bmax, self.nlb)
bin_def = bin_from_edges(lower, upper)
self.bin_conf = bin_configuration(bin_def); self.Nbins = bin_def[0]
self.inst = {b:{"fwhm":spec_lib.lat.config[b]['fwhm'],"opt. tube":spec_lib.lat.config[b]['opt. tube'],"cl idx":i} for i,b in enumerate(self.bands)}
if alpha_per_split:
for i, band in enumerate(self.bands): self.inst[band]["alpha idx"] = i
else:
# Use a dictionary to handle non-contiguous frequency numbers
freq_map = {freq: i for i, freq in enumerate(self.freqs)}
for freq_val, idx in freq_map.items():
matching_bands = [b for b in self.bands if b.startswith(str(freq_val))]
for band_name in matching_bands:
self.inst[band_name]["alpha idx"] = idx
self._setup_indexing()
[docs]
def calculate(self, idx: int, return_result: bool = False) -> Any:
res = _Result(self.spec, self.fit, idx, self.alpha_per_split, self.rm_same_tube,
self.nlb, self.bmin, self.bmax, self.bands, self.freqs)
try:
input_cls = self.spec.get_spectra(idx, sync='As' in self.fit, avoid_bands=self.avoid_spectra)
except TypeError:
self.spec.compute(idx, sync='As' in self.fit)
input_cls = self.spec.get_spectra(idx, sync='As' in self.fit, avoid_bands=self.avoid_spectra)
self._process_cls(input_cls); del input_cls
converged = False; niter = 0
while not converged and niter < self.niter_max:
try:
cov = self._build_cov(niter, res)
invcov = np.linalg.pinv(cov / self.fsky)
self._solve_linear_system(invcov, niter, res)
ang_now = self._get_ml_alphas(niter+1, res, add_beta='beta' in self.fit)
ang_prev = self._get_ml_alphas(niter, res, add_beta='beta' in self.fit)
diff = np.abs(ang_now - ang_prev) * RAD2ARCMIN
if np.all(diff < self.tol) or np.sum(diff >= self.tol) <= 1: converged = True
niter += 1
except (np.linalg.LinAlgError, StopIteration, KeyError, IndexError) as e:
self.logger.log(f"Iteration {niter} failed: {e}. Stopping.", 'error'); break
with open(self.result_name(idx), "wb") as f: pl.dump(res, f, protocol=pl.HIGHEST_PROTOCOL)
if return_result: return res
[docs]
def estimate_angles(self, idx: int, overwrite: bool = False, Niter: int = -1, to_degrees: bool = True) -> Dict:
file = self.result_name(idx)
if (not os.path.isfile(file)) or overwrite:
res = self.calculate(idx, return_result=True)
if res is None: return {}
else:
res = pl.load(open(file, "rb"))
max_iter = len(res.ml.keys())
if max_iter <= 1:
self.logger.log(f"Calculation for index {idx} did not converge or failed.", 'warning'); return {}
iter_key = f"Iter {max_iter - 1 if Niter == -1 else Niter}"
final_params = res.ml[iter_key]
if to_degrees:
return {k: (np.rad2deg(v) if k not in ["As","Ad","Asd"] else v) for k,v in final_params.items()}
return final_params
[docs]
def _setup_indexing(self):
self.avoid = 4 if self.rm_same_tube else 1
IJidx = []
for i, band_i in enumerate(self.bands):
for j, band_j in enumerate(self.bands):
if self.rm_same_tube:
if self.inst[band_i]["opt. tube"] != self.inst[band_j]["opt. tube"]: IJidx.append((i, j))
elif i != j: IJidx.append((i, j))
self.IJidx = np.array(IJidx, dtype=np.uint8); self.num_pairs = len(self.IJidx)
m, n = np.meshgrid(range(self.num_pairs), range(self.num_pairs), indexing='ij')
self.MNi, self.MNj, self.MNp, self.MNq = self.IJidx[m,0], self.IJidx[m,1], self.IJidx[n,0], self.IJidx[n,1]
[docs]
def _process_cls(self, incls: Dict):
lmax = self.spec.lmax
self.bin_terms, self.cov_terms = {}, {}
raw = {}
cl_shape = (self.Nbands, self.Nbands, lmax + 1)
raw['EEo_ij']=incls['oxo'][:,:,0,:lmax+1]; raw['BBo_ij']=incls['oxo'][:,:,1,:lmax+1]
raw['EBo_ij']=incls['oxo'][:,:,2,:lmax+1]; raw['EiEj_o']=incls['oxo'][:,:,0,:lmax+1]
raw['BiBj_o']=incls['oxo'][:,:,1,:lmax+1]; raw['EiBj_o']=incls['oxo'][:,:,2,:lmax+1]
raw['BiEj_o']=incls['oxo'][:,:,2,:lmax+1].transpose(1,0,2)
str_freqs = [str(f) for f in self.freqs]
for ii, band_i in enumerate(self.bands):
freq_i_str = band_i.split('-')[0]
freq_i = str_freqs.index(freq_i_str)
for jj, band_j in enumerate(self.bands):
freq_j_str = band_j.split('-')[0]
freq_j = str_freqs.index(freq_j_str)
if 'beta' in self.fit:
if ii==0 and jj==0:
raw['EEcmb_ij'] = np.zeros(cl_shape)
raw['BBcmb_ij'] = np.zeros(cl_shape)
fwhm_i, fwhm_j = self.inst[band_i]['fwhm'], self.inst[band_j]['fwhm']
raw['EEcmb_ij'][ii,jj,:] = self._convolve_gaussBeams_pwf("ee", fwhm_i, fwhm_j, lmax)
raw['BBcmb_ij'][ii,jj,:] = self._convolve_gaussBeams_pwf("bb", fwhm_i, fwhm_j, lmax)
if 'Ad' in self.fit:
if ii==0 and jj==0:
for k in ['EBd_ij','EiEj_d','BiBj_d','EiBj_d','BiEj_d','Eid_Ejo','Bid_Bjo','Eid_Bjo','Bid_Ejo']:
raw[k]=np.zeros(cl_shape)
raw['EBd_ij'][ii,jj,:]=incls['dxd'][freq_i,freq_j,2,:lmax+1]
raw['EiEj_d'][ii,jj,:]=incls['dxd'][freq_i,freq_j,0,:lmax+1]
raw['BiBj_d'][ii,jj,:]=incls['dxd'][freq_i,freq_j,1,:lmax+1]
raw['EiBj_d'][ii,jj,:]=incls['dxd'][freq_i,freq_j,2,:lmax+1]
raw['BiEj_d'][ii,jj,:]=incls['dxd'][freq_j,freq_i,2,:lmax+1]
raw['Eid_Ejo'][ii,jj,:]=incls['dxo'][freq_i,jj,0,:lmax+1]
raw['Bid_Bjo'][ii,jj,:]=incls['dxo'][freq_i,jj,1,:lmax+1]
raw['Eid_Bjo'][ii,jj,:]=incls['dxo'][freq_i,jj,2,:lmax+1]
raw['Bid_Ejo'][ii,jj,:]=incls['dxo'][freq_i,jj,3,:lmax+1]
for k, v in raw.items(): self.bin_terms[k + "_b"] = bin_spec_matrix(v, self.bin_conf)
self.cov_terms["C_oxo"] = self._C_oxo(raw['EiEj_o'],raw['BiBj_o'],raw['EiBj_o'],raw['BiEj_o'])
if 'beta' in self.fit: self.cov_terms["C_cmb"] = self._C_cmb()
if 'Ad' in self.fit:
self.cov_terms["C_dxd"] = self._C_fgxfg(raw['EiEj_d'],raw['BiBj_d'],raw['EiBj_d'],raw['BiEj_d'])
self.cov_terms["C_dxo"] = self._C_fgxo(raw['Eid_Ejo'],raw['Bid_Bjo'],raw['Eid_Bjo'],raw['Bid_Ejo'])
[docs]
def _build_cov(self, niter, res):
p=res.ml[f"Iter {niter}"]; ai,aj,ap,aq=self._get_alpha_blocks(niter,res)
with np.errstate(divide='ignore',invalid='ignore'):
c4ij=np.cos(4*ai)+np.cos(4*aj); c4pq=np.cos(4*ap)+np.cos(4*aq)
Aij=np.sin(4*aj)/c4ij; Apq=np.sin(4*aq)/c4pq; Bij=np.sin(4*ai)/c4ij; Bpq=np.sin(4*ap)/c4pq
Dij=2*np.cos(2*ai)*np.cos(2*aj)/c4ij; Dpq=2*np.cos(2*ap)*np.cos(2*aq)/c4pq
Eij=2*np.sin(2*ai)*np.sin(2*aj)/c4ij; Epq=2*np.sin(2*ap)*np.sin(2*aq)/c4pq
cov=self.cov_terms['C_oxo'][0]+Apq*Aij*self.cov_terms['C_oxo'][1]+Bpq*Bij*self.cov_terms['C_oxo'][2]
if "beta" in self.fit:
with np.errstate(divide='ignore',invalid='ignore'): Cij=np.sin(4*p["beta"])/(2*np.cos(2*ai+2*aj)); Cpq=np.sin(4*p["beta"])/(2*np.cos(2*ap+2*aq))
cov+=-2*Cij*Cpq*(self.cov_terms['C_cmb'][0]+self.cov_terms['C_cmb'][1])
if "Ad" in self.fit:
Ad=p["Ad"]; Td=self.cov_terms['C_dxd']; Tdo=self.cov_terms['C_dxo']
cov+=Dij*Dpq*Ad**2*Td[0]+Eij*Epq*Ad**2*Td[1]+Dij*Epq*Ad**2*Td[2]+Eij*Dpq*Ad**2*Td[3]
cov+=-Dij*Ad*Tdo[0]-Dpq*Ad*Tdo[1]-Eij*Ad*Tdo[2]-Epq*Ad*Tdo[3]
return np.nan_to_num(cov)
[docs]
def _solve_linear_system(self, invcov, niter, res):
terms=self._compute_linear_system_terms(invcov); ext_params=[p for p in ['As','Ad','Asd','beta'] if p in self.fit]
sys_mat=np.zeros((res.Nvar,res.Nvar)); ind_term=np.zeros(res.Nvar)
for i,p1 in enumerate(ext_params):
sys_mat[i,i]=self._SYSTEM_MATRIX_CONFIG[(p1,p1)](terms); ind_term[i]=self._IND_TERM_CONFIG[p1](terms)
for j,p2 in enumerate(ext_params[i+1:],start=i+1):
key=(p1,p2) if (p1,p2) in self._SYSTEM_MATRIX_CONFIG else (p2,p1)
val=self._SYSTEM_MATRIX_CONFIG.get(key,lambda t:0)(terms); sys_mat[i,j]=sys_mat[j,i]=val
for k in range(self.Nbands):
idx_a=self.inst[self.bands[k]]['alpha idx']+res.ext_par
val=self._SYSTEM_MATRIX_CONFIG[(p1,'alpha')](terms,k); sys_mat[i,idx_a]+=val; sys_mat[idx_a,i]+=val
for i_band in range(self.Nbands):
idx_i=self.inst[self.bands[i_band]]['alpha idx']+res.ext_par
ind_term[idx_i]+=self._IND_TERM_CONFIG['alpha'](terms,i_band)
for j_band in range(self.Nbands):
idx_j=self.inst[self.bands[j_band]]['alpha idx']+res.ext_par
sys_mat[idx_i,idx_j]+=self._SYSTEM_MATRIX_CONFIG[('alpha','alpha')](terms,i_band,j_band)
solution=np.linalg.solve(sys_mat,ind_term); cov_now=np.linalg.pinv(sys_mat); std_now=np.sqrt(np.diagonal(cov_now))
if np.any(np.isnan(std_now)): raise StopIteration("NaN in std")
iter_key=f"Iter {niter+1}"; res.ml[iter_key],res.std_fisher[iter_key]={},{}
res.cov_fisher[iter_key]=cov_now
for i,p in enumerate(ext_params): res.ml[iter_key][p]=solution[i]; res.std_fisher[iter_key][p]=std_now[i]
for i,key in enumerate(res.alpha_keys): res.ml[iter_key][str(key)]=solution[i+res.ext_par]; res.std_fisher[iter_key][str(key)]=std_now[i+res.ext_par]
[docs]
def _summation(self, v_ij, v_pq, invcov_T):
v1=v_ij[self.MNi,self.MNj,:]; v2=v_pq[self.MNp,self.MNq,:]
return np.sum(v1*invcov_T*v2,axis=2).reshape(self.MNi.shape)
[docs]
def _compute_linear_system_terms(self, invcov):
terms={}; invcov_T=np.moveaxis(invcov,0,2)
recipes={'B_ijpq':('ijpq','BBo_ij_b','BBo_ij_b'),'E_ijpq':('ijpq','EEo_ij_b','EEo_ij_b'),'I_ijpq':('ijpq','BBo_ij_b','EEo_ij_b'),
'D_ij':('ij','EEo_ij_b','EBo_ij_b'),'H_ij':('ij','BBo_ij_b','EBo_ij_b'),'A':('_','EBo_ij_b','EBo_ij_b'),
'tau_ij':('ij','EEo_ij_b','EEcmb_ij_b'),'varphi_ij':('ij','EEo_ij_b','BBcmb_ij_b'),'ene_ij':('ij','BBo_ij_b','EEcmb_ij_b'),
'epsilon_ij':('ij','BBo_ij_b','BBcmb_ij_b'),'C':('_','EEcmb_ij_b','BBcmb_ij_b'),'F':('_','EEcmb_ij_b','EEcmb_ij_b'),
'G':('_','BBcmb_ij_b','BBcmb_ij_b'),'O':('_','EBo_ij_b','EEcmb_ij_b'),'P':('_','EBo_ij_b','BBcmb_ij_b'),
'sigma_ij':('ij','EEo_ij_b','EBd_ij_b'),'omega_ij':('ij','BBo_ij_b','EBd_ij_b'),'R':('_','EBd_ij_b','EBd_ij_b'),'N':('_','EBo_ij_b','EBd_ij_b'),
'LAMBDA':('_','EBd_ij_b','EEcmb_ij_b'),'mu':('_','EBd_ij_b','BBcmb_ij_b')} # etc.
for name,(shape,s1,s2) in recipes.items():
if s1 not in self.bin_terms or s2 not in self.bin_terms: continue
summed=self._summation(self.bin_terms[s1],self.bin_terms[s2],invcov_T)
if shape=='_': terms[name]=np.atleast_1d(np.sum(summed))
elif shape=='ijpq':
r=np.zeros((self.Nbands,self.Nbands,self.Nbands,self.Nbands)); r[self.MNi.ravel(),self.MNj.ravel(),self.MNp.ravel(),self.MNq.ravel()]=summed.ravel(); terms[name]=r
elif shape=='ij':
r=np.zeros((self.Nbands,self.Nbands)); np.add.at(r,(self.IJidx[:,0],self.IJidx[:,1]),np.sum(summed,axis=1)); terms[name]=r
return terms
_IND_TERM_CONFIG={'Ad':lambda t:t.get('N',[0])[0],'beta':lambda t:2*(t.get('O',[0])[0]-t.get('P',[0])[0]),'alpha':lambda t,i:2*(np.sum(t.get('D_ij',0)[:,i])-np.sum(t.get('H_ij',0)[i,:]))}
_SYSTEM_MATRIX_CONFIG={
('Ad','Ad'):lambda t:t.get('R',[0])[0],('beta','beta'):lambda t:4*(t.get('G',[0])[0]+t.get('F',[0])[0]-2*t.get('C',[0])[0]),
('Ad','beta'):lambda t:2*(t.get('LAMBDA',[0])[0]-t.get('mu',[0])[0]),
('Ad','alpha'):lambda t,i:2*(np.sum(t.get('sigma_ij',0)[:,i])-np.sum(t.get('omega_ij',0)[i,:])),
('beta','alpha'):lambda t,i:4*(np.sum(t.get('tau_ij',0)[:,i])+np.sum(t.get('epsilon_ij',0)[i,:])-np.sum(t.get('varphi_ij',0)[:,i])-np.sum(t.get('ene_ij',0)[i,:])),
('alpha','alpha'):lambda t,i,j:2*(np.sum(t.get('E_ijpq',0)[:,j,:,i])+np.sum(t.get('E_ijpq',0)[:,i,:,j])+np.sum(t.get('B_ijpq',0)[j,:,i,:])+np.sum(t.get('B_ijpq',0)[i,:,j,:])-2*(np.sum(t.get('I_ijpq',0)[j,:,:,i])+np.sum(t.get('I_ijpq',0)[i,:,:,j])))}
[docs]
def _convolve_gaussBeams_pwf(self, mode, fwhm1, fwhm2, lmax):
_,pwf=hp.pixwin(self.nside,pol=True,lmax=lmax); b1=hp.gauss_beam(fwhm1/RAD2ARCMIN,lmax=lmax,pol=True); b2=hp.gauss_beam(fwhm2/RAD2ARCMIN,lmax=lmax,pol=True)
bl=b1[:,1]*b2[:,1] if mode=='ee' else b1[:,2]*b2[:,2]
return self.cmb_cls[mode][:lmax+1]*bl*pwf**2
[docs]
def _C_cmb(self):
lmax=self.spec.lmax; ell=np.arange(lmax+1); bl_EE=np.zeros((self.num_pairs,self.num_pairs,lmax+1)); bl_BB=np.zeros_like(bl_EE)
for m in range(self.num_pairs):
for n in range(self.num_pairs):
b=[hp.gauss_beam(self.inst[self.bands[idx]]['fwhm']/RAD2ARCMIN,lmax=lmax,pol=True) for idx in [self.MNi[m,n],self.MNj[m,n],self.MNp[m,n],self.MNq[m,n]]]
bl_EE[m,n,:]=b[0][:,1]*b[1][:,1]*b[2][:,1]*b[3][:,1]; bl_BB[m,n,:]=b[0][:,2]*b[1][:,2]*b[2][:,2]*b[3][:,2]
_,pwf=hp.pixwin(self.nside,pol=True,lmax=lmax); Tcmb=np.zeros((2,self.num_pairs,self.num_pairs,lmax+1))
with np.errstate(divide='ignore',invalid='ignore'): Tcmb[0]=pwf**4*bl_EE*self.cmb_cls['ee'][:lmax+1]**2/(2*ell+1); Tcmb[1]=pwf**4*bl_BB*self.cmb_cls['bb'][:lmax+1]**2/(2*ell+1)
Tcmb[np.isnan(Tcmb)]=0; return np.moveaxis(bin_cov_matrix(Tcmb,self.bin_conf),3,1)
[docs]
def _C_oxo(self,EiEjo,BiBjo,EiBjo,BiEjo):
lmax=self.spec.lmax; ell=np.arange(lmax+1); C=np.zeros((3,self.num_pairs,self.num_pairs,lmax+1))
with np.errstate(divide='ignore',invalid='ignore'):
C[0]=(EiEjo[self.MNi,self.MNp,:]*BiBjo[self.MNj,self.MNq,:]+EiBjo[self.MNi,self.MNq,:]*BiEjo[self.MNj,self.MNp,:])/(2*ell+1)
C[1]=(EiEjo[self.MNi,self.MNp,:]*EiEjo[self.MNj,self.MNq,:]+EiEjo[self.MNi,self.MNq,:]*EiEjo[self.MNj,self.MNp,:])/(2*ell+1)
C[2]=(BiBjo[self.MNi,self.MNp,:]*BiBjo[self.MNj,self.MNq,:]+BiBjo[self.MNi,self.MNq,:]*BiBjo[self.MNj,self.MNp,:])/(2*ell+1)
C[np.isnan(C)]=0; return np.moveaxis(bin_cov_matrix(C,self.bin_conf),3,1)
[docs]
def _C_fgxfg(self,EiEj,BiBj,EiBj,BiEj):
lmax=self.spec.lmax; ell=np.arange(lmax+1); C=np.zeros((4,self.num_pairs,self.num_pairs,lmax+1))
with np.errstate(divide='ignore',invalid='ignore'):
C[0]=(EiEj[self.MNi,self.MNp,:]*BiBj[self.MNj,self.MNq,:]+EiBj[self.MNi,self.MNq,:]*BiEj[self.MNj,self.MNp,:])/(2*ell+1)
C[1]=BiBj[self.MNi,self.MNp,:]*EiEj[self.MNj,self.MNq,:]/(2*ell+1)
C[2]=EiEj[self.MNi,self.MNq,:]*BiBj[self.MNj,self.MNp,:]/(2*ell+1)
C[3]=BiBj[self.MNi,self.MNq,:]*EiEj[self.MNj,self.MNp,:]/(2*ell+1)
C[np.isnan(C)]=0; return np.moveaxis(bin_cov_matrix(C,self.bin_conf),3,1)
[docs]
def _C_fgxo(self,Eifg_Ejo,Bifg_Bjo,Eifg_Bjo,Bifg_Ejo):
lmax=self.spec.lmax; ell=np.arange(lmax+1); C=np.zeros((4,self.num_pairs,self.num_pairs,lmax+1))
with np.errstate(divide='ignore',invalid='ignore'):
C[0]=(Eifg_Ejo[self.MNi,self.MNp,:]*Bifg_Bjo[self.MNj,self.MNq,:]+Eifg_Bjo[self.MNi,self.MNq,:]*Bifg_Ejo[self.MNj,self.MNp,:])/(2*ell+1)
C[1]=(Eifg_Ejo[self.MNp,self.MNi,:]*Bifg_Bjo[self.MNq,self.MNj,:]+Eifg_Bjo[self.MNp,self.MNj,:]*Bifg_Ejo[self.MNq,self.MNi,:])/(2*ell+1)
C[2]=Bifg_Bjo[self.MNi,self.MNq,:]*Eifg_Ejo[self.MNj,self.MNp,:]/(2*ell+1)
C[3]=Bifg_Bjo[self.MNp,self.MNj,:]*Eifg_Ejo[self.MNq,self.MNi,:]/(2*ell+1)
C[np.isnan(C)]=0; return np.moveaxis(bin_cov_matrix(C,self.bin_conf),3,1)
[docs]
def _get_alpha_blocks(self, niter, res):
alphas=np.zeros(self.Nbands); iter_key=f"Iter {niter}"
for band in self.bands:
key=str(band) if self.alpha_per_split else str(band[:-2])
alphas[self.inst[band]['cl idx']]=res.ml[iter_key][key]
return alphas[self.MNi],alphas[self.MNj],alphas[self.MNp],alphas[self.MNq]
[docs]
def _get_ml_alphas(self, niter, res, add_beta=False):
iter_key=f"Iter {niter}"; alphas=np.zeros(res.Nalpha)
for i,key in enumerate(res.alpha_keys):
alphas[i]=res.ml[iter_key][str(key)]
if add_beta: alphas+=res.ml[iter_key].get('beta',0.0)
return alphas
[docs]
def result_name(self, idx):
fit_tag = f"{self.fit.replace(' + ','_')}{'_sameAlphaPerSplit' if self.alpha_per_split else '_diffAlphaPerSplit'}{'_rmSameTube' if self.rm_same_tube else ''}{'_tempBP'if self.spec.temp_bp else ''}"
bin_tag = f"Nb{self.nlb}_bmin{self.bmin}_bmax{self.bmax}"
spec_tag = f"aposcale{str(self.spec.aposcale).replace('.','p')}{'_CO' if self.spec.CO else ''}{'_PS' if self.spec.PS else ''}{'_pureB' if self.spec.pureB else ''}_N{self.nside}"
fname = f"ml_params_{fit_tag}_{bin_tag}_{spec_tag}_{idx:03d}.pkl"
return os.path.join(self.libdir, fname)