Skip to content

Commit

Permalink
Cleaned up
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed May 8, 2020
1 parent 6009f65 commit 9e47434
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 310 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ module use -a /global/cscratch1/sd/jyc/dtn/sw/modulefiles
module load openmpi
module load zeromq adios2
module load python py-numpy py-mpi4py py-h5py py-pyyaml py-scipy py-matplotlibs
module load python py-numpy py-mpi4py py-h5py py-pyyaml py-scipy py-matplotlib
mpirun -n 5 python -u -m mpi4py.futures receiver_brute.py --config config-dtn.json
```
Expand Down
34 changes: 33 additions & 1 deletion backends/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
"""
Author: Ralph Kube
This defines a basic interface to the backend-storage classes
Defines a basic interface to the backend-storage classes and helper routines
"""

from analysis.channels import channel, channel_pair

class backend():
def __init__(self):
pass
Expand Down Expand Up @@ -78,4 +80,34 @@ def serialize_dispatch_seq(dispatch_seq):
#
#j = json.loads(j_str)


def deserialize_dispatch_seq(channel_ser):
"""Returns a list of list of channel pairs, as created by serialize dispatch_seq
Parameters:
===========
channel_ser: JSON serialization of the channel pairs
Returns:
========
List of list of channel pairs
"""

dispatch_seq = []

for pair_list in channel_ser:
new_list = []
for pair in pair_list:
ch1 = channel(pair["ch1"]["dev"],
pair["ch1"]["ch_v"],
pair["ch1"]["ch_h"])
ch2 = channel(pair["ch2"]["dev"],
pair["ch2"]["ch_v"],
pair["ch2"]["ch_h"])
new_list.append(channel_pair(ch1, ch2))
dispatch_seq.append(new_list)

return dispatch_seq


# End of file backend.py
11 changes: 3 additions & 8 deletions generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@
# Get a data_loader
logger.info("Loading h5 data into memory")
dl = loader_h5(path.join(datapath, "ECEI.018431.LFS.h5"), ch_rg, cfg["transport"]["chunk_size"])
data_all = dl.get()
data_all = dl.get_batch()

#data_all = []
#for i in range(nstep):
# data = np.ones((192, 10_000)) * i
# data_all.append(data)

logger.info(f"Creating writer_gen: shotnr={shotnr}, engine={cfg['transport']['engine']}")

Expand All @@ -77,9 +73,8 @@
if(rank == 0):
logger.info(f"Sending: {i:d} / {nstep:d}. data: dtype={data_all[i].dtype}, min = {data_all[i].min()}, max = {data_all[i].max()}")
writer.BeginStep()
#data_tmp = np.zeros_like(data_all[i])
#data_tmp[:] = data_all[i][:]
#writer.put_data(data_tmp)
# data_tmp = np.zeros_like(data_all[i])
# data_tmp[:] = data_all[i][:]
writer.put_data(data_all[i])
writer.EndStep()
t1 = time.time()
Expand Down
11 changes: 5 additions & 6 deletions processor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import os
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor, MPICommExecutor
import sys
#sys.path.append("/global/homes/r/rkube/software/adios2-current/lib64/python3.7/site-packages")

import logging
import logging.config
Expand Down Expand Up @@ -49,12 +47,10 @@

@attr.s
class AdiosMessage:
"""Storage class used to transfer data from Kstar(Dataman) to
local PoolExecutor"""
"""Storage class used to transfer data from Kstar(Dataman) to local PoolExecutor"""
tstep_idx = attr.ib(repr=True)
data = attr.ib(repr=False)


cfg = {}


Expand Down Expand Up @@ -82,6 +78,9 @@ def consume(Q, executor, my_fft, task_list):
toc_fft = timeit.default_timer()
logger.info(f"rank {rank}: tidx={msg.tstep_idx}: FFT took {(toc_fft - tic_fft):6.4f}s")

np.savez(f"test_data/fft_array_s{msg.tstep_idx:04d}.npz", fft_data=fft_data)
logger.info("STORING FFT DATA")

# Step 2) Distribute the work via the executor
for task in task_list:
#task.calc_and_store(executor, fft_data, msg.tstep_idx, cfg)
Expand Down Expand Up @@ -126,7 +125,7 @@ def main():
logger = logging.getLogger('simple')
logger.info(f"Starting up. Using adios2 from {adios2.__file__}")
cfg["run_id"] = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
cfg["run_id"] = "ABC123"
cfg["run_id"] = "ABC124"
logger.info(f"Starting run {cfg['run_id']}")

# Instantiate a storage backend and store the run configuration and task configuration
Expand Down
4 changes: 1 addition & 3 deletions sources/loader_h5.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get(self):
with h5py.File(self.filename, "r") as df:
for ch in [f"/ECEI/ECEI_{c}/Voltage" for c in self.ch_range]:
data_list.append(df[ch][self.current_chunk * self.chunk_size:
(self.current_chunk + 1) * self.chunk_size])
(self.current_chunk + 1) * self.chunk_size].astype(np.float64))
self.current_chunk += 1
df.close()

Expand All @@ -72,8 +72,6 @@ def get_batch(self):
#data_arr[ch.idx(), :] = data_tmp[:] #df[f"/ECEI/ECEI_{ch.__str__()}/Voltage"]

data_sp = np.split(data_arr, self.num_chunks, axis=1)
print(data_sp[0].shape, data_sp[0][0, 0:10], data_sp[0][1, 0:10])

return(data_sp)


Expand Down
8 changes: 8 additions & 0 deletions startup_cori.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

module unload PrgEnv-cray PrgEnv-gnu PrgEnv-intel
module load PrgEnv-gnu
module unload python
module load python3
module use -a /global/cscratch1/sd/jyc/sw/modulefiles
module load adios2/devel
7 changes: 7 additions & 0 deletions startup_dtn.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/zsh
module use -a /global/cscratch1/sd/jyc/dtn/sw/spack/share/spack/modules/linux-centos7-ivybridge
module use -a /global/cscratch1/sd/jyc/dtn/sw/modulefiles

module load openmpi
module load zeromq adios2
module load python py-numpy py-mpi4py py-h5py py-scipy py-matplotlib py-yaml
26 changes: 13 additions & 13 deletions streaming/reader_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,28 +169,28 @@ def Get(self, ch_rg: channel_range, save: bool=False):
tb = self.gen_timebase()
# Calculate indices where we calculate the normalization offset from
tnorm_idx = (tb > self.tnorm[0]) & (tb < self.tnorm[1])
self.logger.info(f"I found {tnorm_idx} indices where to normalize, tnorm = {self.tnorm}")
self.logger.info(f"I found {tnorm_idx.sum()} indices where to normalize, tnorm = {self.tnorm}")
# Calculate normalization offset if we have enough indices
if(tnorm_idx.sum() > 100):
self.offset_lvl = np.median(time_chunk[:, tnorm_idx], axis=1, keepdims=True)
self.offset_std = time_chunk[:, tnorm_idx].std(axis=1)
#self.got_normalization = True
self.logger.info(f"offset_lvl = {self.offset_lvl}, offset_std = {self.offset_std}")
self.got_normalization = True
# self.logger.info(f"offset_lvl = {self.offset_lvl}, offset_std = {self.offset_std}")

if save:
np.savez("test_data/offset_lvl.npz", offset_lvl = self.offset_lvl)
np.savez("test_data/tnorm_idx.npz", tnorm_idx=tnorm_idx)

#if self.got_normalization:
# self.logger.info(f"time_chunk.shape = {time_chunk.shape}")
# if save:
# np.savez("test_data/time_chunk_s{0:04d}.npz".format(self.CurrentStep()), time_chunk=time_chunk)#
#
# time_chunk = (time_chunk - self.offset_lvl) / time_chunk.mean(axis=1, keepdims=True) - 1.0
# #time_chunk = time_chunk / time_chunk.mean(axis=1, keepdims=True) - 1.0
#
# if save:
# np.savez(f"test_data/time_chunk_tr_s{self.CurrentStep():04d}.npz", time_chunk=time_chunk)
if self.got_normalization:
self.logger.info(f"time_chunk.shape = {time_chunk.shape}")
if save:
np.savez("test_data/time_chunk_s{0:04d}.npz".format(self.CurrentStep()), time_chunk=time_chunk)#

time_chunk = (time_chunk - self.offset_lvl) / time_chunk.mean(axis=1, keepdims=True) - 1.0
#time_chunk = time_chunk / time_chunk.mean(axis=1, keepdims=True) - 1.0

if save:
np.savez(f"test_data/time_chunk_tr_s{self.CurrentStep():04d}.npz", time_chunk=time_chunk)

return time_chunk

Expand Down
2 changes: 2 additions & 0 deletions streaming/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def put_data(self, data:np.ndarray):

if self.writer is not None:
self.logger.info(f"Putting data: name = {self.variable.Name()}, shape = {data.shape}")
if not data.flags.contiguous:
data = np.array(data, copy=True)
self.writer.Put(self.variable, data, adios2.Mode.Sync)

return None
Expand Down
Loading

0 comments on commit 9e47434

Please sign in to comment.