Skip to content

Commit

Permalink
Added cached loader and updated middleman
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed May 26, 2020
1 parent dd977fd commit aa0106c
Show file tree
Hide file tree
Showing 17 changed files with 410 additions and 125 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ needs to take another hop:




# Configuration


Expand Down
3 changes: 1 addition & 2 deletions analysis/kernels_spectral.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def kernel_crosspower(fft_data, ch_it, fft_config):

res = np.zeros([len(ch_it), fft_data.shape[1]], dtype=fft_data.dtype)
for idx, ch_pair in enumerate(ch_it):
res[idx, :] = (fft_data[ch_pair.ch1.idx(), :, :] * fft_data[ch_pair.ch2.idx(), :, :].conj()).mean(axis=1) / fft_config["win_factor"]
res[idx, :] = (fft_data[ch_pair.ch1.idx(), :, :] * fft_data[ch_pair.ch2.idx(), :, :].conj()).mean(axis=1) / fft_config["fft_params"]["win_factor"]

return(np.abs(res).real)

Expand Down Expand Up @@ -115,7 +115,6 @@ def kernel_crosscorr(fft_data, ch_it, fft_params):

#_tmp = np.fft.ifft(X * Y.conj(), n=fft_params['nfft'], axis=0) * fft_params['nfft'] / fft_params['win_factor']


_tmp = np.fft.ifft(X * Y.conj(), axis=0).mean(axis=1) / fft_params['win_factor']
res[idx, :] = np.fft.fftshift(_tmp.real)

Expand Down
9 changes: 4 additions & 5 deletions analysis/tasks_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,9 @@ def calc_and_store(self, fft_data, fft_params, ch_it, info_dict):
dt = t2 - t1

with open(f"/global/homes/r/rkube/repos/delta/outfile_{(comm.rank):03d}.txt", "a") as df:
df.write(f" rank {comm.rank:03d}/{comm.size:03d}: tidx={tidx} {an_name} start {t1:%H:%M:%S} end {t2:%H:%M:%S} on {hostname} \n")
df.write(f" rank {comm.rank:03d}/{comm.size:03d}: tidx={tidx} {an_name} start " + t1.isoformat(sep=" ") + " end " + t2.isoformat(sep=" ") + "\n")
df.flush()





# Zero out the result once it has been written
result = None

Expand Down Expand Up @@ -254,6 +250,9 @@ def submit(self, data, tidx):
toc_fft = timeit.default_timer()
self.logger.info(f"tidx {tidx}: FFT took {(toc_fft - tic_fft):6.4f}s")

if tidx == 1:
np.savez(f"test_data/fft_array_s{tidx:04d}.npz", fft_data = fft_data)

for task in self.task_list:
task.submit(self.executor_anl, fft_data, tidx)

Expand Down
25 changes: 25 additions & 0 deletions configs/config-middle.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"Comment": "This is a configuration for the middle-man.",
"transport_rx":
{
"comment": "Receiver part of the middle-man. Read using BP4 for the time being.",
"chunk_size": 10000,
"nstep": 5,
"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/",
"engine": "BP4",
"channel_range": ["L0101-2408"],
"params":
{
"This section": "IsEmpty"
}
},
"transport_tx":
{
"engine": "SST",
"channel_range": ["L0101-2408"],
"params":
{
"RegistrationMethod": "File"
}
}
}
101 changes: 101 additions & 0 deletions configs/test_3node_all.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
{
"shotnr": 18431,
"source":
{
"source_file": "/global/homes/r/rkube/repos/delta/ECEI.018431.LFS.h5",
"chunk_size": 10000,
"num_chunks": 500,
"channel_range": ["L0101-2408"]
},
"params_bad":
{
"IPAddress": "128.55.205.18",
"DataTransport": "WAN",
"Timeout": "300",
"Port": "12400",
"OneToOneMode": "TRUE",
"AlwaysProvideLatestTimestep": "FALSE",
"OpenTimeoutSecs": "300"
},
"transport_kstar":
{
"chunk_size": 10000,
"nstep": 500,
"datapath": "/global/cscratch1/sd/rkube/delta/tmp_kstar",
"channel_range": ["L0101-2408"],
"engine": "BP4",
"params":
{
"DataTransport": "WAN",
"OpenTimeoutSecs": "60",
"IPAddress": "128.55.205.19",
"Timeout": "60",
"Port": "50555"
}
},
"transport_nersc":
{
"chunk_size": 10000,
"nstep": 500,
"datapath": "/global/cscratch1/sd/rkube/delta/tmp_nersc",
"channel_range": ["L0101-2408"],
"engine": "BP4",
"params":
{
"DataTransport": "WAN",
"OpenTimeoutSecs": "60",
"IPAddress": "128.55.205.19",
"Timeout": "60",
"Port": "50555"
}
},
"storage_mongo":
{
"backend": "mongo",
"datastore": "gridfs",
"datadir": "/global/cscratch1/sd/rkube/delta"
},
"storage":
{
"backend": "null"
},
"ECEI_cfg": {
"TriggerTime": [-0.12, 61.2, 60],
"t_norm": [-0.119, -0.109],
"SampleRate": 500,
"TFcurrent": 23000.0,
"Mode": "O",
"LoFreq": 81,
"LensFocus": 80,
"LensZoom": 340},
"fft_params" : {"nfft": 512, "window": "hann", "overlap": 0.5, "detrend": "constant", "full": true},
"task_list": [{
"task_description" : "cross_correlation",
"analysis": "cross_correlation",
"channel_chunk_size": 32768,
"ref_channels" : "L0101-2408",
"cmp_channels" : "L0101-2408"
},
{
"task_description" : "cross_power",
"analysis": "cross_power",
"channel_chunk_size": 32768,
"ref_channels" : "L0101-2408",
"cmp_channels" : "L0101-2408"
},
{
"task_description" : "cross_phase",
"analysis": "cross_phase",
"channel_chunk_size": 32768,
"ref_channels" : "L0101-2408",
"cmp_channels" : "L0101-2408"
},
{
"task_description" : "coherence",
"analysis": "coherence",
"channel_chunk_size": 32768,
"ref_channels" : "L0101-2408",
"cmp_channels" : "L0101-2408"
}]
}

10 changes: 7 additions & 3 deletions configs/test_all.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{
"source_file": "/global/homes/r/rkube/repos/delta/ECEI.018431.LFS.h5",
"chunk_size": 10000,
"num_chunks": 500,
"channel_range": ["L0101-2408"]
},
"params_bad":
Expand All @@ -19,14 +20,17 @@
"transport":
{
"chunk_size": 10000,
"nstep": 100,
"nstep": 500,
"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/",
"engine": "SST",
"channel_range": ["L0101-2408"],
"engine": "BP4",
"params":
{
"DataTransport": "WAN",
"OpenTimeoutSecs": "60"
"OpenTimeoutSecs": "60",
"IPAddress": "128.55.205.19",
"Timeout": "60",
"Port": "50555"
}
},
"storage_mongo":
Expand Down
18 changes: 8 additions & 10 deletions generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from analysis.channels import channel_range
from streaming.writers import writer_gen
from streaming.adios_helpers import gen_channel_name_v2
from sources.loader_ecei import loader_ecei
from sources.loader_ecei_cached import loader_ecei

"""
Distributes time-chunked ECEI data via ADIOS2.
Expand All @@ -39,14 +39,14 @@

logger = logging.getLogger("simple")

datapath = cfg["transport"]["datapath"]
nstep = cfg["transport"]["nstep"]
datapath = cfg["transport_kstar"]["datapath"]
nstep = cfg["transport_kstar"]["nstep"]
shotnr = cfg["shotnr"]

# Enforce 1:1 mapping of channels and tasks
assert(len(cfg["transport"]["channel_range"]) == size)
assert(len(cfg["transport_kstar"]["channel_range"]) == size)
# Channels this process is reading
ch_rg = channel_range.from_str(cfg["transport"]["channel_range"][rank])
ch_rg = channel_range.from_str(cfg["transport_kstar"]["channel_range"][rank])

# Hard-code the total number of data points
data_pts = int(5e6)
Expand All @@ -58,11 +58,12 @@
# Get a data_loader
logger.info("Loading h5 data into memory")
dl = loader_ecei(cfg)
dl.cache()
batch_gen = dl.batch_generator()

logger.info(f"Creating writer_gen: shotnr={shotnr}, engine={cfg['transport']['engine']}")
logger.info(f"Creating writer_gen: shotnr={shotnr}, engine={cfg['transport_kstar']['engine']}")

writer = writer_gen(cfg["transport"])
writer = writer_gen(cfg["transport_kstar"])

# Pass data layout to writer and reset generator
for data in batch_gen:
Expand All @@ -83,9 +84,6 @@
writer.EndStep()
nstep += 1

if nstep >= 200:
break

toc = time.time()
writer.writer.Close()

Expand Down
25 changes: 12 additions & 13 deletions middleman.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,18 @@ class AdiosMessage:
def forward(Q, cfg):
"""To be executed by a local thread. Pops items from the queue and forwards them."""
logger = logging.getLogger("simple")
writer = writer_gen(cfg["transport_tx"])
dummy_data = np.zeros( (192, cfg["transport_rx"]["chunk_size"]), dtype=np.float64)
writer.DefineVariable(cfg["transport_tx"]["channel_range"][0], dummy_data)
writer = writer_gen(cfg["transport_nersc"])
dummy_data = np.zeros( (192, cfg["transport_nersc"]["chunk_size"]), dtype=np.float64)
writer.DefineVariable(cfg["transport_nersc"]["channel_range"][0], dummy_data)
writer.Open()
logger.info("Starting reader process")

while True:
msg = Q.get()
try:
msg = Q.get(timeout=30.0)
except queue.Empty:
logger.info("Empty queue after waiting until time-out. Exiting")

if msg.tstep_idx == None:
Q.task_done()
logger.info("Received hangup signal")
Expand Down Expand Up @@ -66,15 +70,15 @@ def main():
# of the config file. Therefore some keys are duplicated, such as channel_range. Make sure that these
# items are the same in both sections

assert(cfg["transport_rx"]["channel_range"] == cfg["transport_tx"]["channel_range"])
assert(cfg["transport_kstar"]["channel_range"] == cfg["transport_nersc"]["channel_range"])

with open("configs/logger.yaml", "r") as f:
log_cfg = yaml.safe_load(f.read())
logging.config.dictConfig(log_cfg)
logger = logging.getLogger('simple')

# Create ADIOS reader object
reader = reader_gen(cfg["transport_rx"])
reader = reader_gen(cfg["transport_kstar"])
reader.Open()

dq = queue.Queue()
Expand All @@ -90,8 +94,8 @@ def main():
if stepStatus:
# Read data
logger.info(f"stepStatus == True")
stream_data = reader.Get(channel_range.from_str(cfg["transport_rx"]["channel_range"][0]),
save=True)
stream_data = reader.Get(channel_range.from_str(cfg["transport_kstar"]["channel_range"][0]),
save=False)

# Generate message id and publish is
msg = AdiosMessage(tstep_idx=reader.CurrentStep(), data=stream_data)
Expand All @@ -105,11 +109,6 @@ def main():

last_step = reader.CurrentStep()

if last_step > 10:
logger.info(f"Exiting after 1s0 steps")
dq.put_nowait(AdiosMessage(tstep_idx=None, data=None))
break

logger.info("Exiting main loop")
worker.join()
logger.info("Workers have joined")
Expand Down
11 changes: 7 additions & 4 deletions processor_mpi_tasklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def consume(Q, task_list):
Q.task_done()
break

#if(msg.tidx == 1):
# np.savez(f"test_data/io_array_tr_s{msg.tidx:04d}.npz", msg.data)

logger.info(f"Rank {rank}: Consumed tidx={msg.tstep_idx}")
task_list.submit(msg.data, msg.tstep_idx)

Expand Down Expand Up @@ -169,10 +172,10 @@ def main():

# Create a global executor
#executor = concurrent.futures.ThreadPoolExecutor(max_workers=60)
executor_fft = MPIPoolExecutor(max_workers=4, mpi_info={"host": "nid00152"})
executor_anl = MPIPoolExecutor(max_workers=120, mpi_info={"hostfile": "nid00153"})
executor_fft = MPIPoolExecutor(max_workers=4)
executor_anl = MPIPoolExecutor(max_workers=120)

adios2_varname = channel_range.from_str(cfg["transport"]["channel_range"][0])
adios2_varname = channel_range.from_str(cfg["transport_nersc"]["channel_range"][0])

#with MPICommExecutor(MPI.COMM_WORLD) as executor:
# if executor is not None:
Expand All @@ -194,7 +197,7 @@ def main():
store_backend.store_one({"run_id": cfg['run_id'], "run_config": cfg})

# Create ADIOS reader object
reader = reader_gen(cfg["transport"])
reader = reader_gen(cfg["transport_nersc"])
task_list = task_list_spectral(executor_anl, executor_fft, cfg["task_list"], cfg["fft_params"], cfg["ECEI_cfg"], cfg["storage"])

dq = queue.Queue()
Expand Down
Loading

0 comments on commit aa0106c

Please sign in to comment.