Skip to content

Commit

Permalink
Working version of processor_mpi_tasklist
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed May 22, 2020
1 parent 22864ff commit dd977fd
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 44 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ worker-*
*.o
*.a
*.so
*.txt
core
test_data/
**/.ipynb_checkpoints/
**/build/
mongo_secret
postgre_secret

4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,17 @@ Run this implementation on cori:
module unload PrgEnv-cray PrgEnv-gnu PrgEnv-intel
module load PrgEnv-gnu
module unload craype-hugepages2M
module unload python
module load python3
module use -a /global/cscratch1/sd/jyc/sw/modulefiles
module load adios2/devel
module load python_delta_comm
export OMP_NUM_THREAD=N
srun -n 6 -c N python processor_mpi.py --config configs/test_all.json
Alternatively, run the 2-node scenario using the task_list processor which uses an extra fft executor:
srun -n 6 -c N python -m mpi4py.futures processor_mpi_tasklist.py --config configs/test_all.json
```

## Flexible workflow (3-node scenario)
Expand Down
60 changes: 43 additions & 17 deletions analysis/tasks_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@

import backends

from misc.mpifilehandle import MPIFileHandler

from scipy.signal import stft

from mpi4py import MPI

"""
Author: Ralph Kube
This file contains the task_spectral class and its derived classes. Each one implements
Expand Down Expand Up @@ -132,7 +136,7 @@ def get_dispatch_sequence(self, niter=None):
return(all_chunks)


def calc_and_store(self, stream_data, ch_it, info_dict):
def calc_and_store(self, fft_data, fft_params, ch_it, info_dict):
"""Dispatches a kernel and stores results
Parameters:
Expand All @@ -143,20 +147,40 @@ def calc_and_store(self, stream_data, ch_it, info_dict):
info_dict - metadata for the fft_data object
"""

try:
# Calculate the cross phase
result = self.kernel(fft_data, ch_it, self.fft_params)
import datetime
from socket import gethostname

#logger = logging.getLogger("simple")
comm = MPI.COMM_WORLD

#Store result in the DB
#self.store_backend.store_data(result, info_dict)

#logger.info(f" {info_dict['analysis_name']}: _submit tidx {info_dict['tidx']}, chunk {info_dict['channel_batch']}: Finished")

tidx = info_dict['tidx']
an_name = info_dict["analysis_name"]
hostname = gethostname()#MPI.Get_processor_name()

t1 = datetime.datetime.now()
result = self.kernel(fft_data, ch_it, fft_params)
t2 = datetime.datetime.now()
dt = t2 - t1

with open(f"/global/homes/r/rkube/repos/delta/outfile_{(comm.rank):03d}.txt", "a") as df:
df.write(f" rank {comm.rank:03d}/{comm.size:03d}: tidx={tidx} {an_name} start {t1:%H:%M:%S} end {t2:%H:%M:%S} on {hostname} \n")
df.flush()

#Store result in the DB
self.store_backend.store_data(result, info_dict)
logging.info(f"{self.analysis}: _submit tidx {info_dict['tidx']}, chunk {info_dict['channel_batch']}: Finished")



# Zero out the result once it has been written
result = None

except:
self.logger.info("Unexpected error in calc_and_store:", sys.exc_info()[0])
raise
# Zero out the result once it has been written
result = None

# except:
# logger.info("Unexpected error in calc_and_store:", sys.exc_info()[0])
# raise

return None

Expand All @@ -172,7 +196,8 @@ def submit(self, executor, fft_data, tidx):
"tidx": tidx,
"channel_batch": chunk_idx} for chunk_idx in range(self.num_chunks)]

_ = [executor.submit(self.calc_and_store, fft_data, ch_it, info_dict) for ch_it, info_dict in zip(self.get_dispatch_sequence(), info_dict_list)]
_ = [executor.submit(self.calc_and_store, fft_data, self.fft_params, ch_it, info_dict) for ch_it, info_dict in zip(self.get_dispatch_sequence(), info_dict_list)]
self.logger.info(f"tidx={tidx} submitted {self.analysis}")

return None

Expand Down Expand Up @@ -218,12 +243,13 @@ def __init__(self, executor_anl, executor_fft, task_config_list, fft_config, ece
def submit(self, data, tidx):
"""Performs magic"""

self.logger.info(f"task_list.submit is called. tidx={tidx}. data.shape= {data.shape}. fft_params={self.fft_params}")
tic_fft = timeit.default_timer()
res = self.executor_fft.submit(stft, data, axis=1, fs=self.fft_params["fs"], nperseg=self.fft_params["nfft"],
window=self.fft_params["window"], detrend=self.fft_params["detrend"],
noverlap=self.fft_params["noverlap"], padded=False, return_onesided=False, boundary=None)
res = self.executor_fft.submit(stft, data, axis=1, fs=self.fft_params["fs"], nperseg=self.fft_params["nfft"], window=self.fft_params["window"], detrend=self.fft_params["detrend"], noverlap=self.fft_params["noverlap"], padded=False, return_onesided=False, boundary=None)
fft_data = res.result()
# fft_data = stft(data, axis=1, fs=self.fft_params["fs"], nperseg=self.fft_params["nfft"],
# window=self.fft_params["window"], detrend=self.fft_params["detrend"],
# noverlap=self.fft_params["noverlap"], padded=False, return_onesided=False, boundary=None)

fft_data = np.fft.fftshift(fft_data[2], axes=1)
toc_fft = timeit.default_timer()
self.logger.info(f"tidx {tidx}: FFT took {(toc_fft - tic_fft):6.4f}s")
Expand Down
15 changes: 11 additions & 4 deletions configs/logger.yaml
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
version: 1
formatters:
simple:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
format: '%(levelname)s - %(asctime)s - %(message)s'
extended:
format: "%(levelname)s %(asctime)s,%(msecs)d [Process %(process)d %(processName)s %(threadName)s] [%(module)s %(funcName)s]: %(message)s "

handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: extended
formatter: simple
stream: ext://sys.stdout

file_handler:
class: logging.FileHandler
level: INFO
formatter: extended
filename: delta.log


loggers:
simple:
level: DEBUG
handlers: [console]
level: INFO
handlers: [console, file_handler]
propagate: no

DB:
Expand Down
19 changes: 12 additions & 7 deletions configs/test_all.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,27 @@
"chunk_size": 10000,
"channel_range": ["L0101-2408"]
},
"params_bad":
{
"IPAddress": "128.55.205.18",
"DataTransport": "WAN",
"Timeout": "300",
"Port": "12400",
"OneToOneMode": "TRUE",
"AlwaysProvideLatestTimestep": "FALSE",
"OpenTimeoutSecs": "300"
},
"transport":
{
"chunk_size": 10000,
"nstep": 500,
"nstep": 100,
"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/",
"engine": "SST",
"channel_range": ["L0101-2408"],
"params":
{
"IPAddress": "128.55.205.18",
"DataTransport": "WAN",
"Timeout": "300",
"Port": "12400",
"OneToOneMode": "TRUE",
"AlwaysProvideLatestTimestep": "FALSE",
"OpenTimeoutSecs": "300"
"OpenTimeoutSecs": "60"
}
},
"storage_mongo":
Expand Down
2 changes: 0 additions & 2 deletions generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@

writer.Open()



logger.info("Start sending on channel:")
tic = time.time()
nstep = 0
Expand Down
26 changes: 12 additions & 14 deletions processor_mpi_tasklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
from analysis.tasks_mpi import task_list_spectral
from analysis.channels import channel_range

from misc.mpifilehandle import MPIFileHandler


@attr.s
class AdiosMessage:
Expand Down Expand Up @@ -126,7 +128,7 @@ def consume(Q, task_list):

while True:
try:
msg = Q.get(timeout=20.0)
msg = Q.get(timeout=10.0)
except queue.Empty:
logger.info("Empty queue after waiting until time-out. Exiting")
break
Expand All @@ -135,7 +137,7 @@ def consume(Q, task_list):
Q.task_done()
break

logger.info(f"rank {rank}: tidx={msg.tstep_idx}")
logger.info(f"Rank {rank}: Consumed tidx={msg.tstep_idx}")
task_list.submit(msg.data, msg.tstep_idx)

Q.task_done()
Expand Down Expand Up @@ -167,8 +169,9 @@ def main():

# Create a global executor
#executor = concurrent.futures.ThreadPoolExecutor(max_workers=60)
executor_fft = MPIPoolExecutor(max_workers=4, mpi_info={"host": "nid00104"})
executor_anl = MPIPoolExecutor(max_workers=10, mpi_info={"hostfile": "mpi_hosts.txt"})
executor_fft = MPIPoolExecutor(max_workers=4, mpi_info={"host": "nid00152"})
executor_anl = MPIPoolExecutor(max_workers=120, mpi_info={"hostfile": "nid00153"})

adios2_varname = channel_range.from_str(cfg["transport"]["channel_range"][0])

#with MPICommExecutor(MPI.COMM_WORLD) as executor:
Expand Down Expand Up @@ -200,7 +203,6 @@ def main():
tic_main = timeit.default_timer()
workers = []
for _ in range(4):
#thr = ConsumeThread(dq, executor, task_list, cfg)
worker = threading.Thread(target=consume, args=(dq, task_list))
worker.start()
workers.append(worker)
Expand All @@ -209,13 +211,11 @@ def main():
# data stream. Put this right before entering the main loop
logger.info(f"{rank} Waiting for generator")
reader.Open()
last_step = 0
logger.info(f"Starting main loop")

rx_list = []
while True:
stepStatus = reader.BeginStep()
logger.info(f"currentStep = {reader.CurrentStep()}")
if stepStatus:
# Read data
stream_data = reader.Get(adios2_varname, save=False)
Expand All @@ -224,19 +224,17 @@ def main():
# Generate message id and publish
msg = AdiosMessage(tstep_idx=reader.CurrentStep(), data=stream_data)
dq.put_nowait(msg)
logger.info(f"Published message {msg}")
logger.info(f"Published tidx {msg.tstep_idx}")
reader.EndStep()
else:
logger.info(f"Exiting: StepStatus={stepStatus}")
break

# #Early stopping for debug
if reader.CurrentStep() > 5:
logger.info(f"Exiting: CurrentStep={reader.CurrentStep()}, StepStatus={stepStatus}")
dq.put(AdiosMessage(tstep_idx=None, data=None))
break

last_step = reader.CurrentStep()
# if reader.CurrentStep() > 5:
# logger.info(f"Exiting: CurrentStep={reader.CurrentStep()}, StepStatus={stepStatus}")
# dq.put(AdiosMessage(tstep_idx=None, data=None))
# break

dq.join()
logger.info("Queue joined")
Expand Down
1 change: 1 addition & 0 deletions streaming/reader_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def Get(self, ch_rg: channel_range, save: bool=False):
self.logger.info(f"Inquired variables: {var}")
time_chunk = np.zeros(var.Shape(), dtype=np.float64)
self.reader.Get(var, time_chunk, adios2.Mode.Sync)
self.logger.info(f"Got data")

if save:
np.savez(f"test_data/time_chunk_tr_s{self.CurrentStep():04d}.npz", time_chunk=time_chunk)
Expand Down

0 comments on commit dd977fd

Please sign in to comment.