From 9ab53537ee4bd8c9ddda75c9540ee2f348213fb0 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Wed, 1 Apr 2020 15:36:26 -0700 Subject: [PATCH 01/13] fix(psestimation): refactor generate and q_estimator --- drift/core/beamtransfer.py | 2 - drift/core/psestimation.py | 80 ++++++++++++++++++++++++++++---------- 2 files changed, 60 insertions(+), 22 deletions(-) diff --git a/drift/core/beamtransfer.py b/drift/core/beamtransfer.py index 37c0ba4a..df079c4b 100644 --- a/drift/core/beamtransfer.py +++ b/drift/core/beamtransfer.py @@ -1441,8 +1441,6 @@ def project_vector_svd_to_sky(self, mi, vec, temponly=False, conj=False): SVD vector to return. """ npol = 1 if temponly else self.telescope.num_pol_sky - print("temponly", temponly) - print("npol", npol) # if not conj: # raise Exception("Not implemented non conj yet.") diff --git a/drift/core/psestimation.py b/drift/core/psestimation.py index a16b4be2..5a825812 100644 --- a/drift/core/psestimation.py +++ b/drift/core/psestimation.py @@ -516,6 +516,12 @@ def generate(self, regen=False): self.fisher = mpiutil.allreduce(fisher_loc, op=MPI.SUM) self.bias = mpiutil.allreduce(bias_loc, op=MPI.SUM) + self.write_fisher_file() + + # =================================================== + + def write_fisher_file(self): + """Write PS estimation products into hdf5 file""" # Write out all the PS estimation products if mpiutil.rank0: et = time.time() @@ -570,8 +576,6 @@ def generate(self, regen=False): f.close() - # =================================================== - def fisher_file(self): """Fetch the h5py file handle for the Fisher matrix. @@ -592,31 +596,28 @@ def fisher_bias(self): # ====== Estimate the q-parameters from data ======== - def q_estimator(self, mi, vec1, vec2=None, noise=False): - """Estimate the q-parameters from given data (see paper). - + def project_vector_kl_to_sky(self, mi, vec1, vec2=None): + """ Parameters ---------- mi : integer - The m-mode we are calculating for. - vec : np.ndarray[num_kl, num_realisatons] - The vector(s) of data we are estimating from. These are KL-mode - coefficients. - noise : boolean, optional - Whether we should project against the noise matrix. Used for - estimating the bias by Monte-Carlo. Default is False. + M-mode index to fetch for. + vec : np.ndarrays + The vector(s) of data in the KL-basis packed as [num_kl, num_realisatons] Returns ------- - qa : np.ndarray[numbands] - Array of q-parameters. If noise=True then the array is one longer, - and the last parameter is the projection against the noise. + x2, y2 : np.ndarray + The vectors(s) of data in the sky basis packed as [nfreq, lmax+1, num_realisations] """ evals, evecs = self.kltrans.modes_m(mi) if evals is None: - return np.zeros((self.nbands + 1 if noise else self.nbands,)) + return ( + np.zeros((0,), dtype=np.complex128), + np.zeros((0,), dtype=np.complex128), + ) # Weight by C**-1 (transposes are to ensure broadcast works for 1 and 2d vecs) x0 = (vec1.T / (evals + 1.0)).T @@ -625,16 +626,55 @@ def q_estimator(self, mi, vec1, vec2=None, noise=False): x1 = np.dot(evecs.T.conj(), x0) # Project back into sky basis - x2 = self.kltrans.beamtransfer.project_vector_svd_to_sky(mi, x1, conj=True) + x2 = self.kltrans.beamtransfer.project_vector_svd_to_sky( + mi, x1, temponly=True, conj=True + ) + # Slice array for temponly + x2 = x2[:, 0] if vec2 is not None: y0 = (vec2.T / (evals + 1.0)).T y1 = np.dot(evecs.T.conj(), x0) - y2 = self.kltrans.beamtransfer.project_vector_svd_to_sky(mi, x1, conj=True) + y2 = self.kltrans.beamtransfer.project_vector_svd_to_sky( + mi, x1, temponly=True, conj=True + ) + # Slice array for temponly + y2 = y2[:, 0] + else: y0 = x0 y2 = x2 + return x2, y2 + + def q_estimator(self, mi, vec1, vec2=None, noise=False): + """Estimate the q-parameters from given data (see paper). + + Parameters + ---------- + mi : integer + The m-mode we are calculating for. + vec : np.ndarray[num_kl, num_realisatons] + The vector(s) of data we are estimating from. These are KL-mode + coefficients. + noise : boolean, optional + Whether we should project against the noise matrix. Used for + estimating the bias by Monte-Carlo. Default is False. + + Returns + ------- + qa : np.ndarray[numbands] + Array of q-parameters. If noise=True then the array is one longer, + and the last parameter is the projection against the noise. + """ + + evals, evecs = self.kltrans.modes_m(mi) + + x2, y2 = self.project_vector_kl_to_sky(mi, vec1, vec2=None) + + if (x2.shape[0], y2.shape[0]) == (0, 0): + return np.zeros((self.nbands + 1 if noise else self.nbands,)) + # Create empty q vector (length depends on if we're calculating the noise term too) qa = np.zeros((self.nbands + 1 if noise else self.nbands,) + vec1.shape[1:]) @@ -645,8 +685,8 @@ def q_estimator(self, mi, vec1, vec2=None, noise=False): for li in range(lside): - lxvec = x2[:, 0, li] - lyvec = y2[:, 0, li] + lxvec = x2[:, li] + lyvec = y2[:, li] qa[bi] += np.sum( lyvec.conj() From 5b8807483cc89c98d285f86b27f3553c4226d462 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Wed, 1 Apr 2020 19:08:25 -0700 Subject: [PATCH 02/13] feat(PSMonteCarloXLarge): Fisher matrix via Monte-Carlo simulations for large telescopes --- drift/core/psmc.py | 263 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 263 insertions(+) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index 924c0b54..7e32ce5c 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -209,6 +209,269 @@ def _work_fisher_bias_m(self, mi): return fisher, bias +class PSMonteCarloXLarge(psestimation.PSEstimation, PSMonteCarlo): + """An extension of the PSEstimation class to support estimation of the + Fisher matrix via Monte-Carlo simulations for very large arrays. + + Attributes + ---------- + nsamples : integer + The number of samples to draw from each band. + """ + + nsamples = config.Property(proptype=int, default=500) + + # ==== Calculate the total Fisher matrix/bias ======= + + def make_clzz_array(self): + """Store clarray for power spectrum bands in parallel with MPIArray.""" + + nbands = self.nbands + nfreq = self.telescope.nfreq + lmax = self.telescope.lmax + + self.clarray = mpiarray.MPIArray( + (nbands, lmax + 1, nfreq, nfreq), + axis=0, + dtype=np.float64, + comm=MPI.COMM_WORLD, + ) + + self.clarray[:] = 0.0 + + for bl, bg in self.clarray.enumerate(axis=0): + self.clarray[bl] = self.make_clzz(self.band_pk[bg]) + + def generate(self, regen=False): + """Calculate the total Fisher matrix and bias and save to a file. + + Parameters + ---------- + regen : boolean, optional + Force regeneration if products already exist (default `False`). + """ + + if mpiutil.rank0: + st = time.time() + print("======== Starting PS calculation ========") + + ffile = self.psdir + "/fisher.hdf5" + + if os.path.exists(ffile) and not regen: + print("Fisher matrix file: %s exists. Skipping..." % ffile) + return + + mpiutil.barrier() + + # Pre-compute all the angular power spectra for the bands. + self.genbands() + + comm = MPI.COMM_WORLD + size = comm.Get_size() + rank = comm.Get_rank() + + st = time.time() + + m_chunks, low_bound, upp_bound = self.split_single( + (self.telescope.mmax + 1), size + ) + num_chunks = m_chunks.shape[0] + + if mpiutil.rank0: + print( + "M chunks, lower bounds, upper bounds", m_chunks, low_bound, upp_bound + ) + + n = self.nsamples + + # This is designed such that at each iteration one rank gets one m-mode. + for ci, num_m in enumerate(m_chunks): + if mpiutil.rank0: + print("Starting chunk %i of %i" % (ci, num_chunks)) + + loc_num, loc_start, loc_end = mpiutil.split_local(num_m) + mi = np.arange(low_bound[ci], upp_bound[ci])[loc_start:loc_end] + + # At last iteration of m's, it is necessary to assign m's to remaining ranks otherwise the MPI communication crashes. + if len(mi) != 0: + print("Processing m-mode %i on rank %i" % (mi, rank)) + + if len(mi) == 0: + mi = np.array([low_bound[ci] + rank]) + + nfreq = self.telescope.nfreq + lmax = self.telescope.lmax + + if loc_num > 0: + if self.num_evals(mi) > 0: + # Generate random KL data + x = self.gen_sample(mi, n) + vec1, vec2 = self.project_vector_kl_to_sky(mi, x) + # Make array contiguous + vec1 = np.ascontiguousarray( + vec1.reshape(loc_num, nfreq, lmax + 1, n) + ) + vec2 = np.ascontiguousarray( + vec2.reshape(loc_num, nfreq, lmax + 1, n) + ) + + # If I don't have evals - return zero vector + else: + vec1 = np.zeros((loc_num, nfreq, lmax + 1, n), dtype=np.complex128) + vec2 = vec1 + + else: + vec1 = np.zeros((1, nfreq, lmax + 1, n), dtype=np.complex128) + vec2 = vec1 + + self.qa = mpiarray.MPIArray( + (self.telescope.mmax + 1, self.nbands, n), + axis=1, + comm=MPI.COMM_WORLD, + dtype=np.float64, + ) + + self.qa[:] = 0.0 + + et = time.time() + + dsize = np.prod(vec1.shape) + + for ir in range(size): + st_ir = time.time() + # Only fill qa if we haven't reached mmax + if mi < (self.telescope.mmax + 1): + self.q_estimator(mi, vec1, vec2) + + etq = time.time() + print("Time needed for calculating qa one round", etq - st_ir) + + # We do the MPI communications only (size - 1) times + if ir == (size - 1): + break + + recv_buffer = self.recv_send_data(vec1, axis=-1) + vec1 = recv_buffer + + # Sent data to (rank + 1) % size, hence local mi must be updated to (mi -1) % size + mi = low_bound[ci] + (mi - 1) % size + + etallq = time.time() + print("Time needed for qa calculation all ranks ", etallq - et) + + em = time.time() + print("Time needed for qa calculation all m chunks ", em - et) + + # Once done with all the m's, redistribute qa array over m's + self.qa = self.qa.redistribute(axis=0) + + # Make an array for local fisher an bias + fisher_loc = np.zeros((self.nbands, self.nbands), dtype=np.float64) + bias_loc = np.zeros((self.nbands,), dtype=np.float64) + + # Calculate fisher for each m + for ml, mg in self.qa.enumerate(axis=0): + fisher_m = np.cov(self.qa[ml]) + bias_m = np.mean(self.qa[ml], axis=1) + # Sum over all local m-modes to get the overall fisher and bias per parallel process + fisher_loc += fisher_m.real # be careful with the real + bias_loc += bias_m.real + + self.fisher = mpiutil.allreduce(fisher_loc, op=MPI.SUM) + self.bias = mpiutil.allreduce(bias_loc, op=MPI.SUM) + + self.write_fisher_file() + + def q_estimator(self, mi, vec1, vec2, noise=False): + """Calculate the quadratic estimator for this mi with data vec1 and vec2""" + lside = self.telescope.lmax + 1 + + for bi, bg in self.qa.enumerate(axis=1): + for li in range(lside): + lxvec = vec1[:, :, li] + lyvec = vec2[:, :, li] + self.qa[mi, bi, :] += np.sum( + lyvec.conj() + * np.matmul(self.clarray[bi][li].astype(np.complex128), lxvec), + axis=1, + ).astype(np.float64) + + # Calculate q_a for noise power (x0^H N x0 = |x0|^2) + if noise: + # If calculating crosspower don't include instrumental noise + noisemodes = 0.0 if self.crosspower else 1.0 + evals, evecs = self.kltrans.modes_m(mi) + noisemodes = noisemodes + (evals if self.zero_mean else 0.0) + + qa.global_slice[mi, -1] = np.sum( + (vec1 * vec2.conj()).T.real * noisemodes, axis=-1 + ) + + def recv_send_data(self, data, axis): + + comm = MPI.COMM_WORLD + size = comm.Get_size() + rank = comm.Get_rank() + + shape = data.shape + dtype = data.dtype + + recv_rank = (rank - 1) % size + send_rank = (rank + 1) % size + + recv_buffer = np.zeros(shape, dtype=dtype) + + # Need to send in 4GB chunks due to some MPI library. + message_size = 4 * 2 ** 30.0 + dsize = np.prod(shape) * 16.0 + num_messages = int(np.ceil(dsize / message_size)) + print("Number of messages: %i" % num_messages) + # If dsize =0.0 you get 0 num_messages and it throws error. hack. + if num_messages == 0: + num_messages = 1 + + num, sm, em = mpiutil.split_m(shape[axis], num_messages) + + for i in range(num_messages): + slc = [slice(None)] * len(shape) + slc[axis] = slice(sm[i], em[i]) + + di = np.ascontiguousarray(data[slc], dtype=dtype) + bi = np.zeros(di.shape, dtype=dtype) + + # Initiate non-blocking receive + request = comm.Irecv( + [bi, MPI.DOUBLE_COMPLEX], source=recv_rank, tag=(i * size) + recv_rank + ) + # Initiate send + comm.Send([di, MPI.DOUBLE_COMPLEX], dest=send_rank, tag=(i * size) + rank) + # Wait for receive + request.Wait() + + # Fill recv buffer with messages + recv_buffer[slc] = bi + + return recv_buffer + + def split_single(self, m, size): + + quotient = m // size + rem = m % size + + if rem != 0: + m_chunks = np.append((size * np.ones(quotient, dtype=int)), rem) + else: + m_chunks = size * np.ones(quotient, dtype=int) + + bound = np.cumsum(np.insert(m_chunks, 0, 0)) + ms = m_chunks.shape[0] + + low_bound = bound[:ms] + upp_bound = bound[1 : (ms + 1)] + + return m_chunks, low_bound, upp_bound + + def sim_skyvec(trans, n): """Simulate a set of alm(\nu)'s for a given m. From 05ae3d42f74f2997c432f2e9787918b065069047 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Fri, 3 Apr 2020 15:50:48 -0700 Subject: [PATCH 03/13] bug fixes --- drift/core/manager.py | 1 + drift/core/psestimation.py | 7 ++++--- drift/core/psmc.py | 21 +++++++++++---------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/drift/core/manager.py b/drift/core/manager.py index 3966c51f..89a6837c 100644 --- a/drift/core/manager.py +++ b/drift/core/manager.py @@ -51,6 +51,7 @@ "Full": psestimation.PSExact, "MonteCarlo": psmc.PSMonteCarlo, "MonteCarloAlt": psmc.PSMonteCarloAlt, + "MonteCarloXLarge": psmc.PSMonteCarloXLarge, "Cross": crosspower.CrossPower, } diff --git a/drift/core/psestimation.py b/drift/core/psestimation.py index 5a825812..37108f96 100644 --- a/drift/core/psestimation.py +++ b/drift/core/psestimation.py @@ -516,6 +516,10 @@ def generate(self, regen=False): self.fisher = mpiutil.allreduce(fisher_loc, op=MPI.SUM) self.bias = mpiutil.allreduce(bias_loc, op=MPI.SUM) + if mpiutil.rank0: + et = time.time() + print("======== Ending PS calculation (time=%f) ========" % (et - st)) + self.write_fisher_file() # =================================================== @@ -524,9 +528,6 @@ def write_fisher_file(self): """Write PS estimation products into hdf5 file""" # Write out all the PS estimation products if mpiutil.rank0: - et = time.time() - print("======== Ending PS calculation (time=%f) ========" % (et - st)) - # Check to see ensure that Fisher matrix isn't all zeros. if not (self.fisher == 0).all(): # Generate derived quantities (covariance, errors..) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index 7e32ce5c..3a020bb4 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -5,11 +5,15 @@ # === End Python 2/3 compatibility +import os +import time + import numpy as np +from mpi4py import MPI from cora.util import nputil -from caput import mpiutil, config +from caput import mpiutil, config, mpiarray from drift.core import psestimation @@ -209,7 +213,7 @@ def _work_fisher_bias_m(self, mi): return fisher, bias -class PSMonteCarloXLarge(psestimation.PSEstimation, PSMonteCarlo): +class PSMonteCarloXLarge(PSMonteCarlo): """An extension of the PSEstimation class to support estimation of the Fisher matrix via Monte-Carlo simulations for very large arrays. @@ -225,7 +229,6 @@ class PSMonteCarloXLarge(psestimation.PSEstimation, PSMonteCarlo): def make_clzz_array(self): """Store clarray for power spectrum bands in parallel with MPIArray.""" - nbands = self.nbands nfreq = self.telescope.nfreq lmax = self.telescope.lmax @@ -250,10 +253,9 @@ def generate(self, regen=False): regen : boolean, optional Force regeneration if products already exist (default `False`). """ - if mpiutil.rank0: st = time.time() - print("======== Starting PS calculation ========") + print("======== Starting PS calculation MC XLARGE ========") ffile = self.psdir + "/fisher.hdf5" @@ -314,7 +316,6 @@ def generate(self, regen=False): vec2 = np.ascontiguousarray( vec2.reshape(loc_num, nfreq, lmax + 1, n) ) - # If I don't have evals - return zero vector else: vec1 = np.zeros((loc_num, nfreq, lmax + 1, n), dtype=np.complex128) @@ -343,9 +344,6 @@ def generate(self, regen=False): if mi < (self.telescope.mmax + 1): self.q_estimator(mi, vec1, vec2) - etq = time.time() - print("Time needed for calculating qa one round", etq - st_ir) - # We do the MPI communications only (size - 1) times if ir == (size - 1): break @@ -361,7 +359,6 @@ def generate(self, regen=False): em = time.time() print("Time needed for qa calculation all m chunks ", em - et) - # Once done with all the m's, redistribute qa array over m's self.qa = self.qa.redistribute(axis=0) @@ -380,6 +377,10 @@ def generate(self, regen=False): self.fisher = mpiutil.allreduce(fisher_loc, op=MPI.SUM) self.bias = mpiutil.allreduce(bias_loc, op=MPI.SUM) + if mpiutil.rank0: + et = time.time() + print("======== Ending PS calculation (time=%f) ========" % (et - st)) + self.write_fisher_file() def q_estimator(self, mi, vec1, vec2, noise=False): From 091a1d6138ba9279ee63d80d14a5d01e0ea9c8f1 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Mon, 13 Apr 2020 12:26:03 -0700 Subject: [PATCH 04/13] blacken and some more bug fices --- drift/core/psmc.py | 69 ++++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index 3a020bb4..c48d51d5 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -286,6 +286,15 @@ def generate(self, regen=False): n = self.nsamples + self.qa = mpiarray.MPIArray( + (self.telescope.mmax + 1, self.nbands, n), + axis=1, + comm=MPI.COMM_WORLD, + dtype=np.float64, + ) + + self.qa[:] = 0.0 + # This is designed such that at each iteration one rank gets one m-mode. for ci, num_m in enumerate(m_chunks): if mpiutil.rank0: @@ -295,15 +304,16 @@ def generate(self, regen=False): mi = np.arange(low_bound[ci], upp_bound[ci])[loc_start:loc_end] # At last iteration of m's, it is necessary to assign m's to remaining ranks otherwise the MPI communication crashes. - if len(mi) != 0: - print("Processing m-mode %i on rank %i" % (mi, rank)) - if len(mi) == 0: mi = np.array([low_bound[ci] + rank]) + if len(mi) != 0: + print("Processing m-mode %i on rank %i" % (mi, rank)) + nfreq = self.telescope.nfreq lmax = self.telescope.lmax + # TO DO: can maybe dump the extra dimension loc num if loc_num > 0: if self.num_evals(mi) > 0: # Generate random KL data @@ -325,24 +335,14 @@ def generate(self, regen=False): vec1 = np.zeros((1, nfreq, lmax + 1, n), dtype=np.complex128) vec2 = vec1 - self.qa = mpiarray.MPIArray( - (self.telescope.mmax + 1, self.nbands, n), - axis=1, - comm=MPI.COMM_WORLD, - dtype=np.float64, - ) - - self.qa[:] = 0.0 - et = time.time() dsize = np.prod(vec1.shape) for ir in range(size): - st_ir = time.time() # Only fill qa if we haven't reached mmax if mi < (self.telescope.mmax + 1): - self.q_estimator(mi, vec1, vec2) + self.q_estimator(mi, vec1[0]) # We do the MPI communications only (size - 1) times if ir == (size - 1): @@ -355,10 +355,10 @@ def generate(self, regen=False): mi = low_bound[ci] + (mi - 1) % size etallq = time.time() - print("Time needed for qa calculation all ranks ", etallq - et) + print("Time needed for quadratic estimation on all ranks for this m-chunk", etallq - et) - em = time.time() - print("Time needed for qa calculation all m chunks ", em - et) + et_allm = time.time() + print("Time needed for quadratic estimation on all ranks for all m-chunks ", et_allm - et) # Once done with all the m's, redistribute qa array over m's self.qa = self.qa.redistribute(axis=0) @@ -383,19 +383,28 @@ def generate(self, regen=False): self.write_fisher_file() - def q_estimator(self, mi, vec1, vec2, noise=False): + def q_estimator(self, mi, vec1, vec2 = None, noise=False): """Calculate the quadratic estimator for this mi with data vec1 and vec2""" - lside = self.telescope.lmax + 1 - - for bi, bg in self.qa.enumerate(axis=1): - for li in range(lside): - lxvec = vec1[:, :, li] - lyvec = vec2[:, :, li] - self.qa[mi, bi, :] += np.sum( - lyvec.conj() - * np.matmul(self.clarray[bi][li].astype(np.complex128), lxvec), - axis=1, - ).astype(np.float64) + + # if data vector is filled with zeros, return q = 0.0 + if np.sum(vec1) == 0: + self.qa[:] = 0.0 + + # if data vector is empty, return q = 0.0 + if (vec1.shape[0], vec2.shape[0]) == (0, 0): + self.qa[:] = 0.0 + + else: + lside = self.telescope.lmax + 1 + for bi, bg in self.qa.enumerate(axis=1): + for li in range(lside): + lxvec = vec1[:, li] + lyvec = vec2[:, li] + self.qa[mi, bi] += np.sum( + lyvec.conj() + * np.matmul(self.clarray[bi][li].astype(np.complex128), lxvec), + axis=0, + ).astype(np.float64) # Calculate q_a for noise power (x0^H N x0 = |x0|^2) if noise: @@ -404,7 +413,7 @@ def q_estimator(self, mi, vec1, vec2, noise=False): evals, evecs = self.kltrans.modes_m(mi) noisemodes = noisemodes + (evals if self.zero_mean else 0.0) - qa.global_slice[mi, -1] = np.sum( + self.qa.global_slice[mi, -1] = np.sum( (vec1 * vec2.conj()).T.real * noisemodes, axis=-1 ) From dad3e71cf594d0f0c997e485a4317d3ede7f6566 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Mon, 13 Apr 2020 12:27:55 -0700 Subject: [PATCH 05/13] blacken --- drift/core/psmc.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index c48d51d5..b27ffeef 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -355,10 +355,16 @@ def generate(self, regen=False): mi = low_bound[ci] + (mi - 1) % size etallq = time.time() - print("Time needed for quadratic estimation on all ranks for this m-chunk", etallq - et) + print( + "Time needed for quadratic estimation on all ranks for this m-chunk", + etallq - et, + ) et_allm = time.time() - print("Time needed for quadratic estimation on all ranks for all m-chunks ", et_allm - et) + print( + "Time needed for quadratic estimation on all ranks for all m-chunks ", + et_allm - et, + ) # Once done with all the m's, redistribute qa array over m's self.qa = self.qa.redistribute(axis=0) @@ -383,7 +389,7 @@ def generate(self, regen=False): self.write_fisher_file() - def q_estimator(self, mi, vec1, vec2 = None, noise=False): + def q_estimator(self, mi, vec1, vec2=None, noise=False): """Calculate the quadratic estimator for this mi with data vec1 and vec2""" # if data vector is filled with zeros, return q = 0.0 @@ -404,7 +410,7 @@ def q_estimator(self, mi, vec1, vec2 = None, noise=False): lyvec.conj() * np.matmul(self.clarray[bi][li].astype(np.complex128), lxvec), axis=0, - ).astype(np.float64) + ).astype(np.float64) # Calculate q_a for noise power (x0^H N x0 = |x0|^2) if noise: From 35809342596ed692d5e848d010cf1edb8259e813 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Wed, 6 May 2020 11:37:08 -0700 Subject: [PATCH 06/13] fix(psmc): cleaned up MonteCarloXlarge --- drift/core/psmc.py | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index b27ffeef..065c2c92 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -313,7 +313,6 @@ def generate(self, regen=False): nfreq = self.telescope.nfreq lmax = self.telescope.lmax - # TO DO: can maybe dump the extra dimension loc num if loc_num > 0: if self.num_evals(mi) > 0: # Generate random KL data @@ -321,19 +320,15 @@ def generate(self, regen=False): vec1, vec2 = self.project_vector_kl_to_sky(mi, x) # Make array contiguous vec1 = np.ascontiguousarray( - vec1.reshape(loc_num, nfreq, lmax + 1, n) - ) - vec2 = np.ascontiguousarray( - vec2.reshape(loc_num, nfreq, lmax + 1, n) + vec1.reshape(nfreq, lmax + 1, n) ) + # If I don't have evals - return zero vector else: - vec1 = np.zeros((loc_num, nfreq, lmax + 1, n), dtype=np.complex128) - vec2 = vec1 + vec1 = np.zeros((nfreq, lmax + 1, n), dtype=np.complex128) else: - vec1 = np.zeros((1, nfreq, lmax + 1, n), dtype=np.complex128) - vec2 = vec1 + vec1 = np.zeros((nfreq, lmax + 1, n), dtype=np.complex128) et = time.time() @@ -342,7 +337,7 @@ def generate(self, regen=False): for ir in range(size): # Only fill qa if we haven't reached mmax if mi < (self.telescope.mmax + 1): - self.q_estimator(mi, vec1[0]) + self.q_estimator(mi, vec1, vec1) # We do the MPI communications only (size - 1) times if ir == (size - 1): @@ -389,16 +384,15 @@ def generate(self, regen=False): self.write_fisher_file() - def q_estimator(self, mi, vec1, vec2=None, noise=False): + def q_estimator(self, mi, vec1, vec2, noise=False): """Calculate the quadratic estimator for this mi with data vec1 and vec2""" + # if data vector is filled with zeros, return q = 0.0 for this m + if np.all(vec1 == 0) and np.all(vec2 ==0): + self.qa[mi, :] = 0.0 - # if data vector is filled with zeros, return q = 0.0 - if np.sum(vec1) == 0: - self.qa[:] = 0.0 - - # if data vector is empty, return q = 0.0 - if (vec1.shape[0], vec2.shape[0]) == (0, 0): - self.qa[:] = 0.0 + # if data vector is empty, return q = 0.0 for this m + elif (vec1.shape[0], vec2.shape[0]) == (0, 0): + self.qa[mi, :] = 0.0 else: lside = self.telescope.lmax + 1 @@ -441,7 +435,7 @@ def recv_send_data(self, data, axis): message_size = 4 * 2 ** 30.0 dsize = np.prod(shape) * 16.0 num_messages = int(np.ceil(dsize / message_size)) - print("Number of messages: %i" % num_messages) + # If dsize =0.0 you get 0 num_messages and it throws error. hack. if num_messages == 0: num_messages = 1 From 6e6ad1e03d8b970397b4536a9d0da69ae19706ba Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Wed, 6 May 2020 11:45:30 -0700 Subject: [PATCH 07/13] blacken --- drift/core/psmc.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index 065c2c92..a0bec7c1 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -319,9 +319,7 @@ def generate(self, regen=False): x = self.gen_sample(mi, n) vec1, vec2 = self.project_vector_kl_to_sky(mi, x) # Make array contiguous - vec1 = np.ascontiguousarray( - vec1.reshape(nfreq, lmax + 1, n) - ) + vec1 = np.ascontiguousarray(vec1.reshape(nfreq, lmax + 1, n)) # If I don't have evals - return zero vector else: @@ -387,7 +385,7 @@ def generate(self, regen=False): def q_estimator(self, mi, vec1, vec2, noise=False): """Calculate the quadratic estimator for this mi with data vec1 and vec2""" # if data vector is filled with zeros, return q = 0.0 for this m - if np.all(vec1 == 0) and np.all(vec2 ==0): + if np.all(vec1 == 0) and np.all(vec2 == 0): self.qa[mi, :] = 0.0 # if data vector is empty, return q = 0.0 for this m From eac383f09a7f43cd58b4e5cefc73548dc3986061 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Sun, 6 Dec 2020 21:09:37 -0800 Subject: [PATCH 08/13] fix(psmc, psestimation, manager, kltransform): added docstrings and comments --- drift/core/manager.py | 2 +- drift/core/psestimation.py | 42 ++++++++------------- drift/core/psmc.py | 75 ++++++++++++++++++++++++++++++-------- 3 files changed, 75 insertions(+), 44 deletions(-) diff --git a/drift/core/manager.py b/drift/core/manager.py index 89a6837c..26bb9807 100644 --- a/drift/core/manager.py +++ b/drift/core/manager.py @@ -51,7 +51,7 @@ "Full": psestimation.PSExact, "MonteCarlo": psmc.PSMonteCarlo, "MonteCarloAlt": psmc.PSMonteCarloAlt, - "MonteCarloXLarge": psmc.PSMonteCarloXLarge, + "MonteCarloLarge": psmc.PSMonteCarloLarge, "Cross": crosspower.CrossPower, } diff --git a/drift/core/psestimation.py b/drift/core/psestimation.py index 37108f96..3e0e2510 100644 --- a/drift/core/psestimation.py +++ b/drift/core/psestimation.py @@ -597,28 +597,25 @@ def fisher_bias(self): # ====== Estimate the q-parameters from data ======== - def project_vector_kl_to_sky(self, mi, vec1, vec2=None): + def project_vector_kl_to_sky(self, mi, vec1): """ Parameters ---------- mi : integer M-mode index to fetch for. - vec : np.ndarrays - The vector(s) of data in the KL-basis packed as [num_kl, num_realisatons] + vec1 : np.ndarray + The vector of data in the KL-basis packed as [num_kl, num_realisatons]. Returns ------- - x2, y2 : np.ndarray - The vectors(s) of data in the sky basis packed as [nfreq, lmax+1, num_realisations] + x2 : np.ndarray + The vector of data in the sky basis packed as [nfreq, lmax+1, num_realisations] """ evals, evecs = self.kltrans.modes_m(mi) if evals is None: - return ( - np.zeros((0,), dtype=np.complex128), - np.zeros((0,), dtype=np.complex128), - ) + return np.zeros((0,), dtype=np.complex128) # Weight by C**-1 (transposes are to ensure broadcast works for 1 and 2d vecs) x0 = (vec1.T / (evals + 1.0)).T @@ -633,20 +630,7 @@ def project_vector_kl_to_sky(self, mi, vec1, vec2=None): # Slice array for temponly x2 = x2[:, 0] - if vec2 is not None: - y0 = (vec2.T / (evals + 1.0)).T - y1 = np.dot(evecs.T.conj(), x0) - y2 = self.kltrans.beamtransfer.project_vector_svd_to_sky( - mi, x1, temponly=True, conj=True - ) - # Slice array for temponly - y2 = y2[:, 0] - - else: - y0 = x0 - y2 = x2 - - return x2, y2 + return x2 def q_estimator(self, mi, vec1, vec2=None, noise=False): """Estimate the q-parameters from given data (see paper). @@ -655,9 +639,8 @@ def q_estimator(self, mi, vec1, vec2=None, noise=False): ---------- mi : integer The m-mode we are calculating for. - vec : np.ndarray[num_kl, num_realisatons] - The vector(s) of data we are estimating from. These are KL-mode - coefficients. + vec1, vec2 : np.ndarrays[num_kl, num_realisatons] + The vector(s) of data we are estimating from. These are KL-mode coefficients. Passing in `vec2` is optional and for cross power spectrum calculation. noise : boolean, optional Whether we should project against the noise matrix. Used for estimating the bias by Monte-Carlo. Default is False. @@ -671,7 +654,12 @@ def q_estimator(self, mi, vec1, vec2=None, noise=False): evals, evecs = self.kltrans.modes_m(mi) - x2, y2 = self.project_vector_kl_to_sky(mi, vec1, vec2=None) + x2 = self.project_vector_kl_to_sky(mi, vec1) + + if vec2 is not None: + y2 = self.project_vector_kl_to_sky(mi, vec2) + else: + y2 = x2 if (x2.shape[0], y2.shape[0]) == (0, 0): return np.zeros((self.nbands + 1 if noise else self.nbands,)) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index a0bec7c1..f44e636b 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -213,7 +213,7 @@ def _work_fisher_bias_m(self, mi): return fisher, bias -class PSMonteCarloXLarge(PSMonteCarlo): +class PSMonteCarloLarge(PSMonteCarlo): """An extension of the PSEstimation class to support estimation of the Fisher matrix via Monte-Carlo simulations for very large arrays. @@ -274,16 +274,12 @@ def generate(self, regen=False): st = time.time() + # Split up all m-modes such that each MPI process receives one m-mode m_chunks, low_bound, upp_bound = self.split_single( (self.telescope.mmax + 1), size ) num_chunks = m_chunks.shape[0] - if mpiutil.rank0: - print( - "M chunks, lower bounds, upper bounds", m_chunks, low_bound, upp_bound - ) - n = self.nsamples self.qa = mpiarray.MPIArray( @@ -317,11 +313,11 @@ def generate(self, regen=False): if self.num_evals(mi) > 0: # Generate random KL data x = self.gen_sample(mi, n) - vec1, vec2 = self.project_vector_kl_to_sky(mi, x) + vec1 = self.project_vector_kl_to_sky(mi, x) # Make array contiguous vec1 = np.ascontiguousarray(vec1.reshape(nfreq, lmax + 1, n)) - # If I don't have evals - return zero vector + # If I don't have evals - return zero vector. Each MPI process must have some data to pass around. Otherwise send and receive won't work. else: vec1 = np.zeros((nfreq, lmax + 1, n), dtype=np.complex128) @@ -332,6 +328,7 @@ def generate(self, regen=False): dsize = np.prod(vec1.shape) + # Loop over total number of ranks given by `size`. for ir in range(size): # Only fill qa if we haven't reached mmax if mi < (self.telescope.mmax + 1): @@ -383,13 +380,30 @@ def generate(self, regen=False): self.write_fisher_file() def q_estimator(self, mi, vec1, vec2, noise=False): - """Calculate the quadratic estimator for this mi with data vec1 and vec2""" + """Estimate the q-parameters from given data (see paper). + + Parameters + ---------- + mi : integer + The m-mode we are calculating for. + vec1, vec2 : np.ndarrays[num_kl, num_realisatons] + The vector(s) of data we are estimating from. These are KL-mode coefficients. Passing in `vec2` different from `vec1` is for cross power spectrum calculation. + noise : boolean, optional + Whether we should project against the noise matrix. Used for + estimating the bias by Monte-Carlo. Default is False. + + Returns + ------- + qa : np.ndarray[numbands] + Array of q-parameters. If noise=True then the array is one longer, and the last parameter is the projection against the noise. + """ + # if data vector is filled with zeros, return q = 0.0 for this m - if np.all(vec1 == 0) and np.all(vec2 == 0): + if np.all(vec1 == 0) or np.all(vec2 == 0): self.qa[mi, :] = 0.0 - # if data vector is empty, return q = 0.0 for this m - elif (vec1.shape[0], vec2.shape[0]) == (0, 0): + # If one of the data vectors is empty, return q = 0.0 for this m + elif (vec1.shape[0] == 0) or (vec2.shape[0] == 0): self.qa[mi, :] = 0.0 else: @@ -416,6 +430,21 @@ def q_estimator(self, mi, vec1, vec2, noise=False): ) def recv_send_data(self, data, axis): + """Send data from one MPI process to the next by using a vector buffer. + + Parameters + ---------- + data : np.ndarray, complex + Data vector that is sent to the next rank. Expecting + a complex data type. + axis : int + Axis over which the data is devided into chunks of 4GB. + + Returns + ------- + recv_buffer : np.ndarray + The data vector received from the previous rank. + """ comm = MPI.COMM_WORLD size = comm.Get_size() @@ -434,10 +463,6 @@ def recv_send_data(self, data, axis): dsize = np.prod(shape) * 16.0 num_messages = int(np.ceil(dsize / message_size)) - # If dsize =0.0 you get 0 num_messages and it throws error. hack. - if num_messages == 0: - num_messages = 1 - num, sm, em = mpiutil.split_m(shape[axis], num_messages) for i in range(num_messages): @@ -462,6 +487,24 @@ def recv_send_data(self, data, axis): return recv_buffer def split_single(self, m, size): + """Split number of m elements into chunks of `size`. Also return the lower and upper bounds of `size` elements in m. + + Parameters + ----------- + m : int + Number of total elements. + size: size + Number of MPI processes running. + + Returns + ------- + m_chunks : list + List of chunk sizes. + low_bound : list + Lower bound of where chunk starts. + upp_bound : list + Upper bound where chunk stops. + """ quotient = m // size rem = m % size From c46609859ed7837e0090018c0649a8d36827cd4e Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Thu, 14 Jan 2021 19:33:17 -0800 Subject: [PATCH 09/13] docs(psestimation, psmc): update documentation in PSMonteCarloLarge --- drift/core/psestimation.py | 11 ++++++++++ drift/core/psmc.py | 41 +++++++++++++++++++++++++++++--------- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/drift/core/psestimation.py b/drift/core/psestimation.py index 3e0e2510..432fbee3 100644 --- a/drift/core/psestimation.py +++ b/drift/core/psestimation.py @@ -687,11 +687,22 @@ def q_estimator(self, mi, vec1, vec2=None, noise=False): # Calculate q_a for noise power (x0^H N x0 = |x0|^2) if noise: + evals, evecs = self.kltrans.modes_m(mi) + + if evals is None: + return np.zeros((self.nbands + 1)) # If calculating crosspower don't include instrumental noise noisemodes = 0.0 if self.crosspower else 1.0 noisemodes = noisemodes + (evals if self.zero_mean else 0.0) + x0 = (vec1.T / (evals + 1.0)).T + + if vec2 is not None: + y0 = (vec2.T / (evals + 1.0)).T + else: + y0 = x0 + qa[-1] = np.sum((x0 * y0.conj()).T.real * noisemodes, axis=-1) return qa.real diff --git a/drift/core/psmc.py b/drift/core/psmc.py index f44e636b..ee554435 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -221,14 +221,20 @@ class PSMonteCarloLarge(PSMonteCarlo): ---------- nsamples : integer The number of samples to draw from each band. + noise : bool + Default : False. If set to True, calculate a q_a for the noise power. """ nsamples = config.Property(proptype=int, default=500) + noise = False # ==== Calculate the total Fisher matrix/bias ======= def make_clzz_array(self): - """Store clarray for power spectrum bands in parallel with MPIArray.""" + """Calculate the response of the angular powerspectrum to each power spectrum band using an MPI array distributed over the band axis. + + Uses the lmax and frequencies from the telescope object. + """ nbands = self.nbands nfreq = self.telescope.nfreq lmax = self.telescope.lmax @@ -255,7 +261,7 @@ def generate(self, regen=False): """ if mpiutil.rank0: st = time.time() - print("======== Starting PS calculation MC XLARGE ========") + print("======== Starting PS calculation LARGE ========") ffile = self.psdir + "/fisher.hdf5" @@ -282,8 +288,12 @@ def generate(self, regen=False): n = self.nsamples + # Option to add another dimension for a noise term + nbands = self.nbands + 1 if noise else self.nbands + + # Create an MPI array for the q-estimator distributed over bands self.qa = mpiarray.MPIArray( - (self.telescope.mmax + 1, self.nbands, n), + (self.telescope.mmax + 1, nbands, n), axis=1, comm=MPI.COMM_WORLD, dtype=np.float64, @@ -291,7 +301,8 @@ def generate(self, regen=False): self.qa[:] = 0.0 - # This is designed such that at each iteration one rank gets one m-mode. + # This loop is designed such that the total number of m's is split up in chunks. + # Each chunk contains as many m's as there are ranks such that each rank gets exactly one m-mode. for ci, num_m in enumerate(m_chunks): if mpiutil.rank0: print("Starting chunk %i of %i" % (ci, num_chunks)) @@ -334,14 +345,14 @@ def generate(self, regen=False): if mi < (self.telescope.mmax + 1): self.q_estimator(mi, vec1, vec1) - # We do the MPI communications only (size - 1) times + # We do the MPI communications between ranks only (size - 1) times if ir == (size - 1): break recv_buffer = self.recv_send_data(vec1, axis=-1) vec1 = recv_buffer - # Sent data to (rank + 1) % size, hence local mi must be updated to (mi -1) % size + # Send data to (rank + 1) % size, hence local mi must be updated to (mi -1) % size mi = low_bound[ci] + (mi - 1) % size etallq = time.time() @@ -420,13 +431,25 @@ def q_estimator(self, mi, vec1, vec2, noise=False): # Calculate q_a for noise power (x0^H N x0 = |x0|^2) if noise: - # If calculating crosspower don't include instrumental noise - noisemodes = 0.0 if self.crosspower else 1.0 evals, evecs = self.kltrans.modes_m(mi) + + if evals is None: + self.qa.global_slice[mi, -1] = 0.0 + + # If calculating crosspower don't include instrumental noise + # noisemodes = 0.0 if self.crosspower else 1.0 + noisemodes = 1.0 noisemodes = noisemodes + (evals if self.zero_mean else 0.0) + x0 = (vec1.T / (evals + 1.0)).T + + if vec2 is not None: + y0 = (vec2.T / (evals + 1.0)).T + else: + y0 = x0 + self.qa.global_slice[mi, -1] = np.sum( - (vec1 * vec2.conj()).T.real * noisemodes, axis=-1 + (x0 * y0.conj()).T.real * noisemodes, axis=-1 ) def recv_send_data(self, data, axis): From c18e4fc72d8076db4d565b4ccbbe20a33c6ff96e Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Sat, 30 Jan 2021 23:16:38 -0800 Subject: [PATCH 10/13] fix(psestimation,psmc): docstrings and variable names --- drift/core/psestimation.py | 2 -- drift/core/psmc.py | 69 ++++++++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/drift/core/psestimation.py b/drift/core/psestimation.py index 432fbee3..b1f3b1f3 100644 --- a/drift/core/psestimation.py +++ b/drift/core/psestimation.py @@ -652,8 +652,6 @@ def q_estimator(self, mi, vec1, vec2=None, noise=False): and the last parameter is the projection against the noise. """ - evals, evecs = self.kltrans.modes_m(mi) - x2 = self.project_vector_kl_to_sky(mi, vec1) if vec2 is not None: diff --git a/drift/core/psmc.py b/drift/core/psmc.py index ee554435..b1a1849f 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -222,11 +222,12 @@ class PSMonteCarloLarge(PSMonteCarlo): nsamples : integer The number of samples to draw from each band. noise : bool - Default : False. If set to True, calculate a q_a for the noise power. + Default : False. If set to True, project against the noise matrix. Used for + estimating the bias by Monte-Carlo. """ nsamples = config.Property(proptype=int, default=500) - noise = False + noise = config.Property(proptype=bool, default=False) # ==== Calculate the total Fisher matrix/bias ======= @@ -289,7 +290,7 @@ def generate(self, regen=False): n = self.nsamples # Option to add another dimension for a noise term - nbands = self.nbands + 1 if noise else self.nbands + nbands = self.nbands + 1 if self.noise else self.nbands # Create an MPI array for the q-estimator distributed over bands self.qa = mpiarray.MPIArray( @@ -335,7 +336,7 @@ def generate(self, regen=False): else: vec1 = np.zeros((nfreq, lmax + 1, n), dtype=np.complex128) - et = time.time() + st_q = time.time() dsize = np.prod(vec1.shape) @@ -343,7 +344,13 @@ def generate(self, regen=False): for ir in range(size): # Only fill qa if we haven't reached mmax if mi < (self.telescope.mmax + 1): - self.q_estimator(mi, vec1, vec1) + self.q_estimator(mi, vec1) + + # TO DO: Calculate q_a for noise power (x0^H N x0 = |x0|^2) + # The issue right now is that the sky vector is passed from rank to rank + # and not the kl-vector, which we need for estimating the noise bias. + # if self.noise and loc_num > 0: + # self.noise_bias(mi, x) # We do the MPI communications between ranks only (size - 1) times if ir == (size - 1): @@ -355,16 +362,16 @@ def generate(self, regen=False): # Send data to (rank + 1) % size, hence local mi must be updated to (mi -1) % size mi = low_bound[ci] + (mi - 1) % size - etallq = time.time() + et_q = time.time() print( "Time needed for quadratic estimation on all ranks for this m-chunk", - etallq - et, + et_q - st_q, ) et_allm = time.time() print( "Time needed for quadratic estimation on all ranks for all m-chunks ", - et_allm - et, + et_allm - st, ) # Once done with all the m's, redistribute qa array over m's self.qa = self.qa.redistribute(axis=0) @@ -375,8 +382,13 @@ def generate(self, regen=False): # Calculate fisher for each m for ml, mg in self.qa.enumerate(axis=0): - fisher_m = np.cov(self.qa[ml]) - bias_m = np.mean(self.qa[ml], axis=1) + fisher_m = np.cov(self.qa[ml])[: self.nbands, : self.nbands] + + if self.noise: + bias_m = fisher_m[-1, : self.nbands] + else: + bias_m = np.mean(self.qa[ml], axis=1) + # Sum over all local m-modes to get the overall fisher and bias per parallel process fisher_loc += fisher_m.real # be careful with the real bias_loc += bias_m.real @@ -390,18 +402,16 @@ def generate(self, regen=False): self.write_fisher_file() - def q_estimator(self, mi, vec1, vec2, noise=False): + def q_estimator(self, mi, x2, y2=None): """Estimate the q-parameters from given data (see paper). Parameters ---------- mi : integer The m-mode we are calculating for. - vec1, vec2 : np.ndarrays[num_kl, num_realisatons] - The vector(s) of data we are estimating from. These are KL-mode coefficients. Passing in `vec2` different from `vec1` is for cross power spectrum calculation. - noise : boolean, optional - Whether we should project against the noise matrix. Used for - estimating the bias by Monte-Carlo. Default is False. + x2, y2 : np.ndarrays[nfreq, lmax+1, num_realisations] + The vector(s) of data we are estimating from in the sky basis, + Passing in `y2` different from `x2` is for cross power spectrum calculation. Returns ------- @@ -410,35 +420,42 @@ def q_estimator(self, mi, vec1, vec2, noise=False): """ # if data vector is filled with zeros, return q = 0.0 for this m - if np.all(vec1 == 0) or np.all(vec2 == 0): + if np.all(x2 == 0) or np.all(y2 == 0): self.qa[mi, :] = 0.0 # If one of the data vectors is empty, return q = 0.0 for this m - elif (vec1.shape[0] == 0) or (vec2.shape[0] == 0): + elif x2.shape[0] == 0: self.qa[mi, :] = 0.0 else: lside = self.telescope.lmax + 1 for bi, bg in self.qa.enumerate(axis=1): for li in range(lside): - lxvec = vec1[:, li] - lyvec = vec2[:, li] + lxvec = x2[:, li] + lyvec = y2[:, li] self.qa[mi, bi] += np.sum( lyvec.conj() * np.matmul(self.clarray[bi][li].astype(np.complex128), lxvec), axis=0, ).astype(np.float64) + def noise_bias(self, mi, vec1, vec2=None): + """A method to estimate the noise bias. If noise=True then the q-estimator MPI-array is one longer, and the last parameter is the projection against the noise. + + Parameters + ---------- + mi : integer + The m-mode we are calculating for. + vec1, vec2 : np.ndarrays[num_kl, num_realisatons] + The vector(s) of data we are estimating from. These are KL-mode coefficients. Passing in `vec2` is optional and for cross power spectrum calculation. + """ # Calculate q_a for noise power (x0^H N x0 = |x0|^2) - if noise: - evals, evecs = self.kltrans.modes_m(mi) + evals, evecs = self.kltrans.modes_m(mi) - if evals is None: - self.qa.global_slice[mi, -1] = 0.0 + if evals is not None: # If calculating crosspower don't include instrumental noise - # noisemodes = 0.0 if self.crosspower else 1.0 - noisemodes = 1.0 + noisemodes = 0.0 if self.crosspower else 1.0 noisemodes = noisemodes + (evals if self.zero_mean else 0.0) x0 = (vec1.T / (evals + 1.0)).T From 9f71ec642e4f45fa64cd27ab4d886c27edc13a11 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Mon, 1 Feb 2021 16:28:45 -0800 Subject: [PATCH 11/13] fix(psmc): check if y2 is set to None in method q_estimator --- drift/core/psmc.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index 46216b43..487dd567 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -412,6 +412,10 @@ def q_estimator(self, mi, x2, y2=None): Array of q-parameters. If noise=True then the array is one longer, and the last parameter is the projection against the noise. """ + # If y2 is None set it to x2 + if y2 is None: + y2 = x2 + # if data vector is filled with zeros, return q = 0.0 for this m if np.all(x2 == 0) or np.all(y2 == 0): self.qa[mi, :] = 0.0 From c72303e736c2c70e6c780abf01afd3cd438fe86e Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Mon, 1 Feb 2021 18:41:05 -0800 Subject: [PATCH 12/13] fix(psmc): black and fixing lgtm alerts --- drift/core/psestimation.py | 1 - drift/core/psmc.py | 8 ++------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/drift/core/psestimation.py b/drift/core/psestimation.py index 63a4e7b7..c6a681f7 100644 --- a/drift/core/psestimation.py +++ b/drift/core/psestimation.py @@ -17,7 +17,6 @@ from drift.util import util from mpi4py import MPI -from future.utils import with_metaclass def uniform_band(k, kstart, kend): diff --git a/drift/core/psmc.py b/drift/core/psmc.py index 487dd567..e5ae85dc 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -85,7 +85,7 @@ def _work_fisher_bias_m(self, mi): x = self.gen_sample(mi, n) qa[:, s:e] = self.q_estimator(mi, x) - ft = np.cov(qa) + # ft = np.cov(qa) fisher = np.cov(qa) # ft[:self.nbands, :self.nbands] # bias = ft[-1, :self.nbands] @@ -272,8 +272,6 @@ def generate(self, regen=False): size = comm.Get_size() rank = comm.Get_rank() - st = time.time() - # Split up all m-modes such that each MPI process receives one m-mode m_chunks, low_bound, upp_bound = self.split_single( (self.telescope.mmax + 1), size @@ -331,8 +329,6 @@ def generate(self, regen=False): st_q = time.time() - dsize = np.prod(vec1.shape) - # Loop over total number of ranks given by `size`. for ir in range(size): # Only fill qa if we haven't reached mmax @@ -421,7 +417,7 @@ def q_estimator(self, mi, x2, y2=None): self.qa[mi, :] = 0.0 # If one of the data vectors is empty, return q = 0.0 for this m - elif (x2.shape[0] == 0): + elif x2.shape[0] == 0: self.qa[mi, :] = 0.0 else: From 894066036d50638dc0a1e274588affcb4c0f8178 Mon Sep 17 00:00:00 2001 From: Carolin Hofer Date: Sat, 19 Jun 2021 14:48:36 -0700 Subject: [PATCH 13/13] fix : code cleanup, option for noise bias estimate --- drift/core/psmc.py | 89 ++++++++++++---------------------------------- 1 file changed, 22 insertions(+), 67 deletions(-) diff --git a/drift/core/psmc.py b/drift/core/psmc.py index e5ae85dc..4d0b1eca 100644 --- a/drift/core/psmc.py +++ b/drift/core/psmc.py @@ -26,8 +26,9 @@ class PSMonteCarlo(psestimation.PSEstimation): """ nsamples = config.Property(proptype=int, default=500) + noiseonly = config.Property(proptype=bool, default=False) - def gen_sample(self, mi, nsamples=None, noiseonly=False): + def gen_sample(self, mi, nsamples=None): """Generate a random set of KL-data for this m-mode. Found by drawing from the eigenvalue distribution. @@ -49,7 +50,7 @@ def gen_sample(self, mi, nsamples=None, noiseonly=False): evals, evecs = self.kltrans.modes_m(mi) # Calculate C**(1/2), this is the weight to generate a draw from C - w = np.ones_like(evals) if noiseonly else (evals + 1.0) ** 0.5 + w = np.ones_like(evals) if self.noiseonly else (evals + 1.0) ** 0.5 # Calculate x x = nputil.complex_std_normal((evals.shape[0], nsamples)) * w[:, np.newaxis] @@ -85,11 +86,8 @@ def _work_fisher_bias_m(self, mi): x = self.gen_sample(mi, n) qa[:, s:e] = self.q_estimator(mi, x) - # ft = np.cov(qa) - - fisher = np.cov(qa) # ft[:self.nbands, :self.nbands] - # bias = ft[-1, :self.nbands] - bias = qa.mean(axis=1) # [:self.nbands] + fisher = np.cov(qa) + bias = qa.mean(axis=1) return fisher, bias @@ -214,13 +212,9 @@ class PSMonteCarloLarge(PSMonteCarlo): ---------- nsamples : integer The number of samples to draw from each band. - noise : bool - Default : False. If set to True, project against the noise matrix. Used for - estimating the bias by Monte-Carlo. """ nsamples = config.Property(proptype=int, default=500) - noise = config.Property(proptype=bool, default=False) # ==== Calculate the total Fisher matrix/bias ======= @@ -280,12 +274,9 @@ def generate(self, regen=False): n = self.nsamples - # Option to add another dimension for a noise term - nbands = self.nbands + 1 if self.noise else self.nbands - # Create an MPI array for the q-estimator distributed over bands self.qa = mpiarray.MPIArray( - (self.telescope.mmax + 1, nbands, n), + (self.telescope.mmax + 1, self.nbands, n), axis=1, comm=MPI.COMM_WORLD, dtype=np.float64, @@ -327,7 +318,8 @@ def generate(self, regen=False): else: vec1 = np.zeros((nfreq, lmax + 1, n), dtype=np.complex128) - st_q = time.time() + if mpiutil.rank0: + st_q = time.time() # Loop over total number of ranks given by `size`. for ir in range(size): @@ -335,12 +327,6 @@ def generate(self, regen=False): if mi < (self.telescope.mmax + 1): self.q_estimator(mi, vec1) - # TO DO: Calculate q_a for noise power (x0^H N x0 = |x0|^2) - # The issue right now is that the sky vector is passed from rank to rank - # and not the kl-vector, which we need for estimating the noise bias. - # if self.noise and loc_num > 0: - # self.noise_bias(mi, x) - # We do the MPI communications between ranks only (size - 1) times if ir == (size - 1): break @@ -351,17 +337,20 @@ def generate(self, regen=False): # Send data to (rank + 1) % size, hence local mi must be updated to (mi -1) % size mi = low_bound[ci] + (mi - 1) % size - et_q = time.time() + if mpiutil.rank0: + et_q = time.time() + print( + "Time needed for quadratic estimation on all ranks for this m-chunk", + et_q - st_q, + ) + + if mpiutil.rank0: + et_allm = time.time() print( - "Time needed for quadratic estimation on all ranks for this m-chunk", - et_q - st_q, - ) + "Time needed for quadratic estimation on all ranks for all m-chunks ", + et_allm - st, + ) - et_allm = time.time() - print( - "Time needed for quadratic estimation on all ranks for all m-chunks ", - et_allm - st, - ) # Once done with all the m's, redistribute qa array over m's self.qa = self.qa.redistribute(axis=0) @@ -371,12 +360,8 @@ def generate(self, regen=False): # Calculate fisher for each m for ml, mg in self.qa.enumerate(axis=0): - fisher_m = np.cov(self.qa[ml])[: self.nbands, : self.nbands] - - if self.noise: - bias_m = fisher_m[-1, : self.nbands] - else: - bias_m = np.mean(self.qa[ml], axis=1) + fisher_m = np.cov(self.qa[ml]) + bias_m = np.mean(self.qa[ml], axis=1) # Sum over all local m-modes to get the overall fisher and bias per parallel process fisher_loc += fisher_m.real # be careful with the real @@ -432,36 +417,6 @@ def q_estimator(self, mi, x2, y2=None): axis=0, ).astype(np.float64) - def noise_bias(self, mi, vec1, vec2=None): - """A method to estimate the noise bias. If noise=True then the q-estimator MPI-array is one longer, and the last parameter is the projection against the noise. - - Parameters - ---------- - mi : integer - The m-mode we are calculating for. - vec1, vec2 : np.ndarrays[num_kl, num_realisatons] - The vector(s) of data we are estimating from. These are KL-mode coefficients. Passing in `vec2` is optional and for cross power spectrum calculation. - """ - # Calculate q_a for noise power (x0^H N x0 = |x0|^2) - evals, evecs = self.kltrans.modes_m(mi) - - if evals is not None: - - # If calculating crosspower don't include instrumental noise - noisemodes = 0.0 if self.crosspower else 1.0 - noisemodes = noisemodes + (evals if self.zero_mean else 0.0) - - x0 = (vec1.T / (evals + 1.0)).T - - if vec2 is not None: - y0 = (vec2.T / (evals + 1.0)).T - else: - y0 = x0 - - self.qa.global_slice[mi, -1] = np.sum( - (x0 * y0.conj()).T.real * noisemodes, axis=-1 - ) - def recv_send_data(self, data, axis): """Send data from one MPI process to the next by using a vector buffer.