From aa0106cfe8a1c153ed927dea79febe36d01c5962 Mon Sep 17 00:00:00 2001 From: Ralph Kube Date: Tue, 26 May 2020 13:10:19 -0700 Subject: [PATCH] Added cached loader and updated middleman --- README.md | 1 - analysis/kernels_spectral.py | 3 +- analysis/tasks_mpi.py | 9 +- configs/config-middle.json | 25 +++ configs/test_3node_all.json | 101 ++++++++++++ configs/test_all.json | 10 +- generator.py | 18 +-- middleman.py | 25 ++- processor_mpi_tasklist.py | 11 +- sources/loader_ecei_cached.py | 124 +++++++++++++++ streaming/adios_helpers.py | 5 + streaming/reader_mpi.py | 6 +- streaming/reader_nompi.py | 9 +- streaming/writer_nompi.py | 4 +- streaming/writers.py | 4 +- tests_performance/performance_coherence.py | 171 ++++++++++++--------- tests_performance/run_kernel_benchmark.sh | 9 +- 17 files changed, 410 insertions(+), 125 deletions(-) create mode 100644 configs/config-middle.json create mode 100644 configs/test_3node_all.json create mode 100644 sources/loader_ecei_cached.py diff --git a/README.md b/README.md index f8cfd12..821f156 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,6 @@ needs to take another hop: - # Configuration diff --git a/analysis/kernels_spectral.py b/analysis/kernels_spectral.py index 535cab0..0731917 100644 --- a/analysis/kernels_spectral.py +++ b/analysis/kernels_spectral.py @@ -56,7 +56,7 @@ def kernel_crosspower(fft_data, ch_it, fft_config): res = np.zeros([len(ch_it), fft_data.shape[1]], dtype=fft_data.dtype) for idx, ch_pair in enumerate(ch_it): - res[idx, :] = (fft_data[ch_pair.ch1.idx(), :, :] * fft_data[ch_pair.ch2.idx(), :, :].conj()).mean(axis=1) / fft_config["win_factor"] + res[idx, :] = (fft_data[ch_pair.ch1.idx(), :, :] * fft_data[ch_pair.ch2.idx(), :, :].conj()).mean(axis=1) / fft_config["fft_params"]["win_factor"] return(np.abs(res).real) @@ -115,7 +115,6 @@ def kernel_crosscorr(fft_data, ch_it, fft_params): #_tmp = np.fft.ifft(X * Y.conj(), n=fft_params['nfft'], axis=0) * fft_params['nfft'] / fft_params['win_factor'] - _tmp = np.fft.ifft(X * Y.conj(), axis=0).mean(axis=1) / fft_params['win_factor'] res[idx, :] = np.fft.fftshift(_tmp.real) diff --git a/analysis/tasks_mpi.py b/analysis/tasks_mpi.py index 104b958..c4627fd 100644 --- a/analysis/tasks_mpi.py +++ b/analysis/tasks_mpi.py @@ -168,13 +168,9 @@ def calc_and_store(self, fft_data, fft_params, ch_it, info_dict): dt = t2 - t1 with open(f"/global/homes/r/rkube/repos/delta/outfile_{(comm.rank):03d}.txt", "a") as df: - df.write(f" rank {comm.rank:03d}/{comm.size:03d}: tidx={tidx} {an_name} start {t1:%H:%M:%S} end {t2:%H:%M:%S} on {hostname} \n") + df.write(f" rank {comm.rank:03d}/{comm.size:03d}: tidx={tidx} {an_name} start " + t1.isoformat(sep=" ") + " end " + t2.isoformat(sep=" ") + "\n") df.flush() - - - - # Zero out the result once it has been written result = None @@ -254,6 +250,9 @@ def submit(self, data, tidx): toc_fft = timeit.default_timer() self.logger.info(f"tidx {tidx}: FFT took {(toc_fft - tic_fft):6.4f}s") + if tidx == 1: + np.savez(f"test_data/fft_array_s{tidx:04d}.npz", fft_data = fft_data) + for task in self.task_list: task.submit(self.executor_anl, fft_data, tidx) diff --git a/configs/config-middle.json b/configs/config-middle.json new file mode 100644 index 0000000..0fa12c6 --- /dev/null +++ b/configs/config-middle.json @@ -0,0 +1,25 @@ +{ + "Comment": "This is a configuration for the middle-man.", + "transport_rx": + { + "comment": "Receiver part of the middle-man. Read using BP4 for the time being.", + "chunk_size": 10000, + "nstep": 5, + "datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/", + "engine": "BP4", + "channel_range": ["L0101-2408"], + "params": + { + "This section": "IsEmpty" + } + }, + "transport_tx": + { + "engine": "SST", + "channel_range": ["L0101-2408"], + "params": + { + "RegistrationMethod": "File" + } + } +} \ No newline at end of file diff --git a/configs/test_3node_all.json b/configs/test_3node_all.json new file mode 100644 index 0000000..c244b73 --- /dev/null +++ b/configs/test_3node_all.json @@ -0,0 +1,101 @@ +{ + "shotnr": 18431, + "source": + { + "source_file": "/global/homes/r/rkube/repos/delta/ECEI.018431.LFS.h5", + "chunk_size": 10000, + "num_chunks": 500, + "channel_range": ["L0101-2408"] + }, + "params_bad": + { + "IPAddress": "128.55.205.18", + "DataTransport": "WAN", + "Timeout": "300", + "Port": "12400", + "OneToOneMode": "TRUE", + "AlwaysProvideLatestTimestep": "FALSE", + "OpenTimeoutSecs": "300" + }, + "transport_kstar": + { + "chunk_size": 10000, + "nstep": 500, + "datapath": "/global/cscratch1/sd/rkube/delta/tmp_kstar", + "channel_range": ["L0101-2408"], + "engine": "BP4", + "params": + { + "DataTransport": "WAN", + "OpenTimeoutSecs": "60", + "IPAddress": "128.55.205.19", + "Timeout": "60", + "Port": "50555" + } + }, + "transport_nersc": + { + "chunk_size": 10000, + "nstep": 500, + "datapath": "/global/cscratch1/sd/rkube/delta/tmp_nersc", + "channel_range": ["L0101-2408"], + "engine": "BP4", + "params": + { + "DataTransport": "WAN", + "OpenTimeoutSecs": "60", + "IPAddress": "128.55.205.19", + "Timeout": "60", + "Port": "50555" + } + }, + "storage_mongo": + { + "backend": "mongo", + "datastore": "gridfs", + "datadir": "/global/cscratch1/sd/rkube/delta" + }, + "storage": + { + "backend": "null" + }, + "ECEI_cfg": { + "TriggerTime": [-0.12, 61.2, 60], + "t_norm": [-0.119, -0.109], + "SampleRate": 500, + "TFcurrent": 23000.0, + "Mode": "O", + "LoFreq": 81, + "LensFocus": 80, + "LensZoom": 340}, + "fft_params" : {"nfft": 512, "window": "hann", "overlap": 0.5, "detrend": "constant", "full": true}, + "task_list": [{ + "task_description" : "cross_correlation", + "analysis": "cross_correlation", + "channel_chunk_size": 32768, + "ref_channels" : "L0101-2408", + "cmp_channels" : "L0101-2408" + }, + { + "task_description" : "cross_power", + "analysis": "cross_power", + "channel_chunk_size": 32768, + "ref_channels" : "L0101-2408", + "cmp_channels" : "L0101-2408" + }, + { + "task_description" : "cross_phase", + "analysis": "cross_phase", + "channel_chunk_size": 32768, + "ref_channels" : "L0101-2408", + "cmp_channels" : "L0101-2408" + }, + { + "task_description" : "coherence", + "analysis": "coherence", + "channel_chunk_size": 32768, + "ref_channels" : "L0101-2408", + "cmp_channels" : "L0101-2408" + }] + } + \ No newline at end of file diff --git a/configs/test_all.json b/configs/test_all.json index 0ef5999..229b93b 100644 --- a/configs/test_all.json +++ b/configs/test_all.json @@ -4,6 +4,7 @@ { "source_file": "/global/homes/r/rkube/repos/delta/ECEI.018431.LFS.h5", "chunk_size": 10000, + "num_chunks": 500, "channel_range": ["L0101-2408"] }, "params_bad": @@ -19,14 +20,17 @@ "transport": { "chunk_size": 10000, - "nstep": 100, + "nstep": 500, "datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/", - "engine": "SST", "channel_range": ["L0101-2408"], + "engine": "BP4", "params": { "DataTransport": "WAN", - "OpenTimeoutSecs": "60" + "OpenTimeoutSecs": "60", + "IPAddress": "128.55.205.19", + "Timeout": "60", + "Port": "50555" } }, "storage_mongo": diff --git a/generator.py b/generator.py index 82d727a..d05265c 100644 --- a/generator.py +++ b/generator.py @@ -15,7 +15,7 @@ from analysis.channels import channel_range from streaming.writers import writer_gen from streaming.adios_helpers import gen_channel_name_v2 -from sources.loader_ecei import loader_ecei +from sources.loader_ecei_cached import loader_ecei """ Distributes time-chunked ECEI data via ADIOS2. @@ -39,14 +39,14 @@ logger = logging.getLogger("simple") -datapath = cfg["transport"]["datapath"] -nstep = cfg["transport"]["nstep"] +datapath = cfg["transport_kstar"]["datapath"] +nstep = cfg["transport_kstar"]["nstep"] shotnr = cfg["shotnr"] # Enforce 1:1 mapping of channels and tasks -assert(len(cfg["transport"]["channel_range"]) == size) +assert(len(cfg["transport_kstar"]["channel_range"]) == size) # Channels this process is reading -ch_rg = channel_range.from_str(cfg["transport"]["channel_range"][rank]) +ch_rg = channel_range.from_str(cfg["transport_kstar"]["channel_range"][rank]) # Hard-code the total number of data points data_pts = int(5e6) @@ -58,11 +58,12 @@ # Get a data_loader logger.info("Loading h5 data into memory") dl = loader_ecei(cfg) +dl.cache() batch_gen = dl.batch_generator() -logger.info(f"Creating writer_gen: shotnr={shotnr}, engine={cfg['transport']['engine']}") +logger.info(f"Creating writer_gen: shotnr={shotnr}, engine={cfg['transport_kstar']['engine']}") -writer = writer_gen(cfg["transport"]) +writer = writer_gen(cfg["transport_kstar"]) # Pass data layout to writer and reset generator for data in batch_gen: @@ -83,9 +84,6 @@ writer.EndStep() nstep += 1 - if nstep >= 200: - break - toc = time.time() writer.writer.Close() diff --git a/middleman.py b/middleman.py index 59b4e0a..16b7aa3 100644 --- a/middleman.py +++ b/middleman.py @@ -29,14 +29,18 @@ class AdiosMessage: def forward(Q, cfg): """To be executed by a local thread. Pops items from the queue and forwards them.""" logger = logging.getLogger("simple") - writer = writer_gen(cfg["transport_tx"]) - dummy_data = np.zeros( (192, cfg["transport_rx"]["chunk_size"]), dtype=np.float64) - writer.DefineVariable(cfg["transport_tx"]["channel_range"][0], dummy_data) + writer = writer_gen(cfg["transport_nersc"]) + dummy_data = np.zeros( (192, cfg["transport_nersc"]["chunk_size"]), dtype=np.float64) + writer.DefineVariable(cfg["transport_nersc"]["channel_range"][0], dummy_data) writer.Open() logger.info("Starting reader process") while True: - msg = Q.get() + try: + msg = Q.get(timeout=30.0) + except queue.Empty: + logger.info("Empty queue after waiting until time-out. Exiting") + if msg.tstep_idx == None: Q.task_done() logger.info("Received hangup signal") @@ -66,7 +70,7 @@ def main(): # of the config file. Therefore some keys are duplicated, such as channel_range. Make sure that these # items are the same in both sections - assert(cfg["transport_rx"]["channel_range"] == cfg["transport_tx"]["channel_range"]) + assert(cfg["transport_kstar"]["channel_range"] == cfg["transport_nersc"]["channel_range"]) with open("configs/logger.yaml", "r") as f: log_cfg = yaml.safe_load(f.read()) @@ -74,7 +78,7 @@ def main(): logger = logging.getLogger('simple') # Create ADIOS reader object - reader = reader_gen(cfg["transport_rx"]) + reader = reader_gen(cfg["transport_kstar"]) reader.Open() dq = queue.Queue() @@ -90,8 +94,8 @@ def main(): if stepStatus: # Read data logger.info(f"stepStatus == True") - stream_data = reader.Get(channel_range.from_str(cfg["transport_rx"]["channel_range"][0]), - save=True) + stream_data = reader.Get(channel_range.from_str(cfg["transport_kstar"]["channel_range"][0]), + save=False) # Generate message id and publish is msg = AdiosMessage(tstep_idx=reader.CurrentStep(), data=stream_data) @@ -105,11 +109,6 @@ def main(): last_step = reader.CurrentStep() - if last_step > 10: - logger.info(f"Exiting after 1s0 steps") - dq.put_nowait(AdiosMessage(tstep_idx=None, data=None)) - break - logger.info("Exiting main loop") worker.join() logger.info("Workers have joined") diff --git a/processor_mpi_tasklist.py b/processor_mpi_tasklist.py index 7a4aac9..a9819d5 100644 --- a/processor_mpi_tasklist.py +++ b/processor_mpi_tasklist.py @@ -137,6 +137,9 @@ def consume(Q, task_list): Q.task_done() break + #if(msg.tidx == 1): + # np.savez(f"test_data/io_array_tr_s{msg.tidx:04d}.npz", msg.data) + logger.info(f"Rank {rank}: Consumed tidx={msg.tstep_idx}") task_list.submit(msg.data, msg.tstep_idx) @@ -169,10 +172,10 @@ def main(): # Create a global executor #executor = concurrent.futures.ThreadPoolExecutor(max_workers=60) - executor_fft = MPIPoolExecutor(max_workers=4, mpi_info={"host": "nid00152"}) - executor_anl = MPIPoolExecutor(max_workers=120, mpi_info={"hostfile": "nid00153"}) + executor_fft = MPIPoolExecutor(max_workers=4) + executor_anl = MPIPoolExecutor(max_workers=120) - adios2_varname = channel_range.from_str(cfg["transport"]["channel_range"][0]) + adios2_varname = channel_range.from_str(cfg["transport_nersc"]["channel_range"][0]) #with MPICommExecutor(MPI.COMM_WORLD) as executor: # if executor is not None: @@ -194,7 +197,7 @@ def main(): store_backend.store_one({"run_id": cfg['run_id'], "run_config": cfg}) # Create ADIOS reader object - reader = reader_gen(cfg["transport"]) + reader = reader_gen(cfg["transport_nersc"]) task_list = task_list_spectral(executor_anl, executor_fft, cfg["task_list"], cfg["fft_params"], cfg["ECEI_cfg"], cfg["storage"]) dq = queue.Queue() diff --git a/sources/loader_ecei_cached.py b/sources/loader_ecei_cached.py new file mode 100644 index 0000000..14b8a05 --- /dev/null +++ b/sources/loader_ecei_cached.py @@ -0,0 +1,124 @@ +#-*- Coding: UTF-8 -*- + +import h5py +#from mpi4py import MPI +import numpy as np + +from analysis.channels import channel_range + +class loader_ecei(): + """Loads KSTAR ECEi data time-chunk wise for a specified channel range from an HDF5 file""" + + + def __init__(self, cfg): + """ + + Old: filename: str, ch_range: channel_range, chunk_size:int): + Inputs: + ======= + cfg: Delta configuration with loader and ECEI section. + + """ + + self.ch_range = channel_range.from_str(cfg["source"]["channel_range"][0]) + # Create a list of paths in the HDF5 file, corresponding to the specified channels + self.filename = cfg["source"]["source_file"] + self.chunk_size = cfg["source"]["chunk_size"] + self.ecei_cfg = cfg["ECEI_cfg"] + self.num_chunks = cfg["source"]["num_chunks"] + self.current_chunk = 0 + + self.tnorm = cfg["ECEI_cfg"]["t_norm"] + + self.got_normalization = False + + self.chunk_list = None + + def gen_timebase(self, chunk_num): + """Create a dummy time base for chunk last read. + + Parameters: + =========== + chunk_num: Number of the chunk to generate a time-base for + """ + + # Unpack the trigger time, plasma time and end time from TriggerTime + tt0, pl, tt1 = self.ecei_cfg["TriggerTime"] + # Get the sampling frequency, in units of Hz + fs = self.ecei_cfg["SampleRate"] * 1e3 + + # The time base starts at t0. Assume samples are streaming in continuously with sampling frequency fs. + # Offset of the first sample in current chunk + offset = self.chunk_size * chunk_num + t_sample = 1. / fs + # Use integers in arange to avoid round-off errors. We want exactly chunk_size elements in this array + tb = np.arange(offset, offset + self.chunk_size, dtype=np.float64) + # Scale timebase with t_sample and shift to tt0 + tb = (tb * t_sample) + tt0 + return(tb) + + + def cache(self): + """Pre-loads all data from HDF5, calculates normalization and + generates a list of arrays.""" + + self.cache = np.zeros([self.ch_range.length(), self.chunk_size * self.num_chunks]) + + # Cache the data in memory + with h5py.File(self.filename, "r",) as df: + for ch in self.ch_range: + chname_h5 = f"/ECEI/ECEI_{ch}/Voltage" + self.cache[ch.idx(), :] = df[chname_h5][:self.chunk_size * self.num_chunks].astype(np.float64) + + + def batch_generator(self): + """Loads the next time-chunk from the data file. + This implementation works as a generator. + In production use this as + + The ECEI data is usually normalized to a fixed offset, calculated using data + at the beginning of the stream. + + The time interval where the data we normalize to is taken from is given in ECEI_config, t_norm. + As long as this data is not seen by the reader, raw data is returned. + + Once the data we normalize to is seen, the normalization values are calculated. + After that, the data from the current and all subsequent chunks is normalized. + + The flag self.is_data_normalized is set to false if raw data is returned. + It is set to true if normalized data is returned. + + >>> batch_gen = loader.batch_generator() + >>> for batch in batch_gen(): + >>> ... + + yields + ======= + time_chunk : ndarray, data from current time chumk + """ + + for current_chunk in range(self.num_chunks): + # Load current time-chunk from HDF5 file + time_chunk = self.cache[:, current_chunk * self.chunk_size:(current_chunk + 1) * self.chunk_size] + + # Scale, see get_data in kstarecei.py + time_chunk = time_chunk * 1e-4 + + # See if we can calculate the normalization. + if self.got_normalization: + # This corresponds to norm == 1 in kstarecei.py + time_chunk = (time_chunk - self.offset_lvl) / time_chunk.mean(axis=1, keepdims=True) - 1.0 + elif self.got_normalization == False: + tb = self.gen_timebase(current_chunk) + tnorm_idx = (tb > self.tnorm[0]) & (tb < self.tnorm[1]) + + if tnorm_idx.sum() > 100: + self.offset_lvl = np.median(time_chunk[:, tnorm_idx], axis=1, keepdims=True) + self.offset_std = time_chunk[:, tnorm_idx].std(axis=1) + self.got_normalization = True + + + yield time_chunk + + +# End of file \ No newline at end of file diff --git a/streaming/adios_helpers.py b/streaming/adios_helpers.py index 8fbe158..7962d96 100644 --- a/streaming/adios_helpers.py +++ b/streaming/adios_helpers.py @@ -24,4 +24,9 @@ def gen_channel_name_v2(shotnr: int, channel_rg: str): return f"{shotnr:05d}_ch{channel_rg:s}" +def gen_channel_name_v3(prefix: str, shotnr: int, channel_rg: str): + + return f"{prefix}/{shotnr:05d}_ch{channel_rg:s}" + + # End of file adios_helpers.py \ No newline at end of file diff --git a/streaming/reader_mpi.py b/streaming/reader_mpi.py index e3ac559..67035c0 100644 --- a/streaming/reader_mpi.py +++ b/streaming/reader_mpi.py @@ -9,7 +9,7 @@ import numpy as np from analysis.channels import channel, channel_range -from streaming.adios_helpers import gen_io_name, gen_channel_name_v2 +from streaming.adios_helpers import gen_io_name, gen_channel_name_v3 """ Author: Ralph Kube @@ -39,7 +39,7 @@ def __init__(self, cfg: dict, shotnr: int=18431): self.reader = None # Generate a descriptive channel name self.chrg = channel_range.from_str(cfg["channel_range"][self.rank]) - self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str()) + self.channel_name = gen_channel_name_v3(cfg["datapath"], self.shotnr, self.chrg.to_str()) self.logger.info(f"reader_base: channel_name = {self.channel_name}") @@ -106,7 +106,6 @@ def Get(self, ch_rg: channel_range, save: bool=False): # elif isinstance(channels, type(None)): self.logger.info(f"Reading varname {ch_rg.to_str()}. Step no. {self.CurrentStep():d}") var = self.IO.InquireVariable(ch_rg.to_str()) - self.logger.info(f"Inquired variables: {var}") time_chunk = np.zeros(var.Shape(), dtype=np.float64) self.reader.Get(var, time_chunk, adios2.Mode.Sync) self.logger.info(f"Got data") @@ -129,7 +128,6 @@ def __init__(self, cfg: dict, shotnr: int=18431): super().__init__(cfg, shotnr) self.IO.SetEngine(cfg["engine"]) self.IO.SetParameters(cfg["params"]) - self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str()) self.reader = None # End of file \ No newline at end of file diff --git a/streaming/reader_nompi.py b/streaming/reader_nompi.py index 1c77955..ff28d76 100644 --- a/streaming/reader_nompi.py +++ b/streaming/reader_nompi.py @@ -8,7 +8,7 @@ import numpy as np from analysis.channels import channel, channel_range -from streaming.adios_helpers import gen_io_name, gen_channel_name_v2 +from streaming.adios_helpers import gen_io_name, gen_channel_name_v3 """ Author: Ralph Kube @@ -31,13 +31,13 @@ def __init__(self, cfg: dict, shotnr: int=18431): self.reader = None # Generate a descriptive channel name - if len(cfg["channel_range"]) > 0: + if len(cfg["channel_range"]) > 1: self.logger.error("reader_base is not using MPI. The following channel_ranges are ignored:") for crg in cfg["channel_range"][1:]: self.logger.error(f"Ignoring channel range {crg}") self.chrg = channel_range.from_str(cfg["channel_range"][0]) - self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str()) + self.channel_name = gen_channel_name_v3(cfg["datapath"], self.shotnr, self.chrg.to_str()) self.logger.info(f"reader_base: channel_name = {self.channel_name}") @@ -106,7 +106,7 @@ def Get(self, ch_rg: channel_range, save: bool=False): var = self.IO.InquireVariable(ch_rg.to_str()) time_chunk = np.zeros(var.Shape(), dtype=np.float64) self.reader.Get(var, time_chunk, adios2.Mode.Sync) - self.logger.info(f"Got data: {time_chunk.shape}, mean = {time_chunk.mean()}") + self.logger.info(f"Got data") if save: np.savez(f"test_data/time_chunk_tr_s{self.CurrentStep():04d}.npz", time_chunk=time_chunk) @@ -126,7 +126,6 @@ def __init__(self, cfg: dict, shotnr: int=18431): super().__init__(cfg, shotnr) self.IO.SetEngine(cfg["engine"]) self.IO.SetParameters(cfg["params"]) - self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str()) self.reader = None # End of file \ No newline at end of file diff --git a/streaming/writer_nompi.py b/streaming/writer_nompi.py index 6a46c8f..8e2c1d9 100644 --- a/streaming/writer_nompi.py +++ b/streaming/writer_nompi.py @@ -7,7 +7,7 @@ import logging -from streaming.adios_helpers import gen_channel_name_v2, gen_io_name +from streaming.adios_helpers import gen_channel_name_v3, gen_io_name from analysis.channels import channel_range class writer_base(): @@ -26,7 +26,7 @@ def __init__(self, cfg: dict, shotnr: int=18431): # Generate a descriptive channel name self.chrg = channel_range.from_str(cfg["channel_range"][0]) - self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str()) + self.channel_name = gen_channel_name_v3(cfg["datapath"], self.shotnr, self.chrg.to_str()) self.logger.info(f"writer_base: channel_name = {self.channel_name}") diff --git a/streaming/writers.py b/streaming/writers.py index f46eb67..20b65d9 100644 --- a/streaming/writers.py +++ b/streaming/writers.py @@ -9,7 +9,7 @@ import logging -from streaming.adios_helpers import gen_channel_name_v2, gen_io_name +from streaming.adios_helpers import gen_channel_name_v3, gen_io_name from analysis.channels import channel_range class writer_base(): @@ -31,7 +31,7 @@ def __init__(self, cfg: dict, shotnr: int=18431): # Generate a descriptive channel name self.chrg = channel_range.from_str(cfg["channel_range"][self.rank]) - self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str()) + self.channel_name = gen_channel_name_v3(cfg["datapath"], self.shotnr, self.chrg.to_str()) self.logger.info(f"writer_base: channel_name = {self.channel_name}") diff --git a/tests_performance/performance_coherence.py b/tests_performance/performance_coherence.py index a1a81ac..db31d67 100644 --- a/tests_performance/performance_coherence.py +++ b/tests_performance/performance_coherence.py @@ -13,14 +13,15 @@ from analysis.channels import channel, channel_pair, channel_range from analysis.kernels_spectral_cy import kernel_coherence_64_cy, kernel_coherence_64_v2 -from analysis.kernels_spectral_cy import kernel_coherence_32_cy, kernel_coherence_32_v2 +#from analysis.kernels_spectral_cy import kernel_coherence_32_cy, kernel_coherence_32_v2 from analysis.kernels_spectral_cy import kernel_crossphase_64_cy, kernel_crossphase_64_v2 -from analysis.kernels_spectral_cy import kernel_crossphase_32_cy, kernel_crossphase_32_v2 +#from analysis.kernels_spectral_cy import kernel_crossphase_32_cy, kernel_crossphase_32_v2 from analysis.kernels_spectral_cy import kernel_crosspower_64_cy, kernel_crosspower_64_v2 -from analysis.kernels_spectral_cy import kernel_crosspower_32_cy, kernel_crosspower_32_v2 +#from analysis.kernels_spectral_cy import kernel_crosspower_32_cy, kernel_crosspower_32_v2 +from analysis.kernels_spectral import kernel_coherence, kernel_crossphase, kernel_crosspower from analysis.task_fft import task_fft_scipy @@ -46,13 +47,17 @@ config_fft["fft_params"]["win_factor"] = win_factor -# my_fft = task_fft_scipy(10_000, config_fft["fft_params"], normalize=True, detrend=True) -# fft_data = my_fft.do_fft_local(io_array_tr) +#my_fft = task_fft_scipy(10_000, config_fft["fft_params"], normalize=True, detrend=True) +#fft_data = my_fft.do_fft_local(io_array_tr) -with np.load("/global/homes/r/rkube/repos/delta/test_data/fft_array_s0004.npz") as df: +with np.load("/global/homes/r/rkube/repos/delta/test_data/fft_array_s0001.npz") as df: fft_data = df["fft_data"] +fft_data_64 = np.ascontiguousarray(fft_data) +fft_data_32 = np.require(fft_data_64, dtype=np.complex64, requirements=['A', 'C']) + + ################################################### # Generate channels to iterate over @@ -75,112 +80,132 @@ ch2_idx_arr = np.array([int(ch_pair.ch2.idx()) for ch_pair in unique_channels], dtype=np.uint64) - print("++++++++++++++++++++++++++++++ Testing coherence +++++++++++++++++++++++++++++++++++++") -#res_coherence_no = kernel_coherence(fft_data, unique_channels, None) -#res_coherence_cy = kernel_coherence_cy(fft_data, channel_pairs, config_fft) -#print(f"Distance: {np.linalg.norm(res_coherence_no.flatten() - res_coherence_cy.flatten()) / np.linalg.norm(res_coherence_no.flatten())}") +res_coherence_no = kernel_coherence(fft_data, channel_pairs, None) +res_coherence_cy = kernel_coherence_64_cy(fft_data, channel_pairs, config_fft) +print(f"Distance: {np.linalg.norm(res_coherence_no.flatten() - res_coherence_cy.flatten()) / np.linalg.norm(res_coherence_no.flatten())}") -n_loop = 10 +n_loop = 1 -# tic_no = timeit.default_timer() +tic_no = timeit.default_timer() +for _ in range(n_loop): + _ = kernel_coherence(fft_data, unique_channels, None) +toc_no = timeit.default_timer() +print(f"Python implementation: {((toc_no - tic_no) / n_loop):6.4f}s") + + +# tic_cy = timeit.default_timer() # for _ in range(n_loop): -# res_coherence_no = kernel_coherence(fft_data, unique_channels, None) -# toc_no = timeit.default_timer() -# print(f"Default implementation: {((toc_no - tic_no) / n_loop):6.4f}s") +# _ = kernel_coherence(fft_data_64, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"Python implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_coherence_64_cy(fft_data_64, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"Cython implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") -fft_data_64 = np.ascontiguousarray(fft_data) -fft_data_32 = np.require(fft_data_64, dtype=np.complex64, requirements=['A', 'C']) +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_coherence_64_v2(fft_data_64, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"C implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_coherence_64_cy(fft_data_64, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"Cython implementation 64byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_coherence_32_cy(fft_data_32, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"Cython implementation 32bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_coherence_64_v2(fft_data_64, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"C implementation 64byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_coherence_32_v2(fft_data_32, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"C implementation 32bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") +print("++++++++++++++++++++++++++++++ Testing cross-power +++++++++++++++++++++++++++++++++++++") +res_crosspower_no = kernel_crosspower(fft_data, channel_pairs, None) +res_crosspower_cy = kernel_crosspower_64_cy(fft_data, channel_pairs, config_fft) +print(f"Distance: {np.linalg.norm(res_crosspower_no.flatten() - res_crosspower_cy.flatten()) / np.linalg.norm(res_crosspower_no.flatten())}") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_coherence_32_cy(fft_data_32, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"Cython implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") tic_cy = timeit.default_timer() for _ in range(n_loop): - _ = kernel_coherence_32_v2(fft_data_32, channel_pairs, config_fft) + _ = kernel_crosspower(fft_data_64, channel_pairs, config_fft) toc_cy = timeit.default_timer() -print(f"C implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +print(f"Python implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") -print("++++++++++++++++++++++++++++++ Testing cross-power +++++++++++++++++++++++++++++++++++++") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_crosspower_64_cy(fft_data_64, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"Cython implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_crosspower_64_v2(fft_data_64, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"C implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_crosspower_64_cy(fft_data_64, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"Cython implementation 64byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_crosspower_32_cy(fft_data_32, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"Cython implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_crosspower_64_v2(fft_data_64, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"C implementation 64byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_crosspower_32_v2(fft_data_32, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"C implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_crosspower_32_cy(fft_data_32, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"Cython implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +print("++++++++++++++++++++++++++++++ Testing cross-phase +++++++++++++++++++++++++++++++++++++") +res_crossphase_no = kernel_crossphase(fft_data, channel_pairs, None) +res_crossphase_cy = kernel_crossphase_64_cy(fft_data, channel_pairs, config_fft) +print(f"Distance: {np.linalg.norm(res_crossphase_no.flatten() - res_crossphase_cy.flatten()) / np.linalg.norm(res_crossphase_no.flatten())}") tic_cy = timeit.default_timer() for _ in range(n_loop): - _ = kernel_crosspower_32_v2(fft_data_32, channel_pairs, config_fft) + _ = kernel_crossphase(fft_data_64, channel_pairs, config_fft) toc_cy = timeit.default_timer() -print(f"C implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +print(f"Python implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") -print("++++++++++++++++++++++++++++++ Testing cross-phase +++++++++++++++++++++++++++++++++++++") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_crossphase_64_cy(fft_data_64, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"Cython implementation 64byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_crossphase_64_cy(fft_data_64, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"Cython implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_crossphase_64_v2(fft_data_64, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"C implementation 64byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_crossphase_64_v2(fft_data_64, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"C implementation 64bit:{((toc_cy - tic_cy) / n_loop):6.4f}s") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_crossphase_32_cy(fft_data_32, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"Cython implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_crossphase_32_cy(fft_data_32, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"Cython implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") -tic_cy = timeit.default_timer() -for _ in range(n_loop): - _ = kernel_crossphase_32_v2(fft_data_32, channel_pairs, config_fft) -toc_cy = timeit.default_timer() -print(f"C implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") +# tic_cy = timeit.default_timer() +# for _ in range(n_loop): +# _ = kernel_crossphase_32_v2(fft_data_32, channel_pairs, config_fft) +# toc_cy = timeit.default_timer() +# print(f"C implementation 32byte:{((toc_cy - tic_cy) / n_loop):6.4f}s") diff --git a/tests_performance/run_kernel_benchmark.sh b/tests_performance/run_kernel_benchmark.sh index 8a7762a..b3d678c 100644 --- a/tests_performance/run_kernel_benchmark.sh +++ b/tests_performance/run_kernel_benchmark.sh @@ -10,4 +10,11 @@ export OMP_NUM_THREADS=4 python performance_coherence.py export OMP_NUM_THREADS=8 -python performance_coherence.py \ No newline at end of file +python performance_coherence.py + +export OMP_NUM_THREADS=16 +python performance_coherence.py + +export OMP_NUM_THREADS=32 +python performance_coherence.py +