Skip to content

Commit

Permalink
Added reference non-blocking processor implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Jan 22, 2020
2 parents 9b2e757 + c1d44dc commit 02c75ae
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 64 deletions.
67 changes: 35 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ to [split the terminal using screen]
(https://unix.stackexchange.com/questions/7453/how-to-split-the-terminal-into-more-than-one-view)


# Workflow Scenario #1
This is a three-node scenario, where data is streamed from a KSTAR DTN to a NERSC DTN and
subsequently passed to a compute node:

# Workflow Scenario #1 (2-node scenario)
```
generator.py =====> receiver.py =====> analysis_adios2.py (not yet implemented)
(running on KSTAR DTN) | (running on NERSC DTN) | (running on NERSC compute nodes)
v v
stream name: shotnum-channelid.bp shotnum-channelid.s1.bp
generator.py =====> processor_xyz.py
(running on KSTAR DTN) | (running on NERSC DTN)
v
stream name: shotnum-channelid.bp
```
This scenario is currently not implemented fully.
xyz=[mpi, mpi_brute, dask, ...]


In this scenario, the processor reads a configuration file, performs the analysis routines
defined in that file, and stores the data for subsequent analysis.


As of now, there is no common format for configuration file. Each processor and receiver has its own
format.


Example commands are as follows:
```
python generator.py --config config-jychoi.json
python receiver.py --config config-jychoi.json
```

Parameters can be provided with a json file. Here is an example:
```
Expand All @@ -61,27 +62,20 @@ Parameters can be provided with a json file. Here is an example:
```


# 2-node scenario
Data is streamed from a KSTAR DTN directly into a cori compute node.
```
KSTAR DTN cori compute

generator.py =========> processor_???.py
|
v
ADIOS2 dataman
stream name: KSTAR_shotnum
```
In this scenario, the processor reads a configuration file, performs the analysis routines
defined in that file, and stores the data for subsequent analysis.

## Currently implemented processors

As of now, there is no common format for configuration file. Each processor and receiver has its own
format.

### Jongs reference implementation.
Example commands are as follows:
```
python generator.py --config config-jychoi.json
python receiver.py --config config-jychoi.json
```

### Currently implemented processors
Note that the processor is called receiver

### MPI processor
processor_mpi_mockup.py: Implements a naked reference implementation that
* Reads dummy data, mimicking a received data package from KSTAR, in the main loop
* Puts the dummy data in a queue
Expand All @@ -99,8 +93,17 @@ some numpy packages are inherently multithreaded. This causes the processor to r
with a larger number of tasks than available on the machine.


### MPI processor brute
RMC's implementation of fluctana in the framework



# (obsolete) Workflow Scenario #2 (3-node scenario)
It consists of three components:
```
generator.py =====> receiver.py =====> analysis.py
(running on KSTAR DTN) | (running on NERSC DTN) | (running on NERSC compute nodes)
v v
stream name: shotnum-channelid.bp shotnum-channelid.s1.bp
```
This scenario is currently not implemented fully.


8 changes: 6 additions & 2 deletions analysis/tasks_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,14 +330,15 @@ def __init__(self, task_config, fft_config, ecei_config):
# These channels serve as the cross-data for the spectral diagnostics
self.cmp_channels = channel_range.from_str(task_config["cmp_channels"])


self.task_config = task_config
self.fft_config = fft_config
self.ecei_config = ecei_config

self.storage_scheme = {"ref_channels": self.ref_channels.to_str(),
"cmp_channels": self.cmp_channels.to_str()}

self.futures_list = []

# Construct a list of unique channels
# F.ex. we have ref_channels [(1,1), (1,2), (1,3)] and cmp_channels = [(1,1), (1,2)]
# The unique list of channels is then
Expand Down Expand Up @@ -448,7 +449,8 @@ def __init__(self, task_config, fft_config, ecei_config):
self.storage_scheme["analysis_name"] = "cross_phase"

def calculate(self, executor, fft_data):
self.futures_list = [executor.submit(cross_phase, fft_data, ch_it) for ch_it in self.get_dispatch_sequence()]
# Append the new futures
self.futures_list += [executor.submit(cross_phase, fft_data, ch_it) for ch_it in self.get_dispatch_sequence()]
return None


Expand All @@ -459,6 +461,7 @@ def __init__(self, task_config, fft_config, ecei_config):
self.storage_scheme["analysis_name"] = "cross_power"

def calculate(self, executor, fft_data):
# Append the new futures
self.futures_list = [executor.submit(cross_power, fft_data, ch_it, self.fft_config) for ch_it in self.get_dispatch_sequence()]
return None

Expand All @@ -470,6 +473,7 @@ def __init__(self, task_config, fft_config, ecei_config):
self.storage_scheme["analysis_name"] = "coherence"

def calculate(self, executor, fft_data):
# Append the new futures
self.futures_list = [executor.submit(coherence, fft_data, ch_it) for ch_it in self.get_dispatch_sequence()]
return None

Expand Down
55 changes: 33 additions & 22 deletions processor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"""

from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
#from mpi4py import MPI
#from mpi4py.futures import MPIPoolExecutor
import sys
sys.path.append("/home/rkube/software/adios2-release_25/lib64/python3.7/site-packages")

Expand All @@ -24,6 +24,7 @@
import random
import string
import queue
import concurrent.futures
import threading

import numpy as np
Expand Down Expand Up @@ -59,23 +60,25 @@
)


@attr.s
@attr.s ``
class AdiosMessage:
"""Storage class used to transfer data from Kstar(Dataman) to
local PoolExecutor"""
tstep_idx = attr.ib(repr=True)
data = attr.ib(repr=False)


def consume(Q, store_backend, my_fft, task_list, cfg):
"""Executed by a local thread. Used to dispatch work items from the
DataMAN Queue to the PoolExecutor"""
#def consume(Q, store_backend, my_fft, task_list, cfg):
def consume(Q, executor, my_fft, task_list, futures_list):
"""Executed by a local thread. Dispatch work items from the
Queue to the PoolExecutor"""

while True:
msg = Q.get()
logging.info(f"Consuming {msg}")

# If we get our special break message, we exit
if msg.tstep_idx == -1:
if msg.tstep_idx == None:
Q.task_done()
break

Expand All @@ -85,16 +88,21 @@ def consume(Q, store_backend, my_fft, task_list, cfg):
toc_fft = timeit.default_timer()
logging.info(f"FFT took {(toc_fft - tic_fft):6.4f}s")

# Step 2) Distribute work among MPI workers
with MPIPoolExecutor(max_workers=256) as executor:
tic_tasks = timeit.default_timer()
for task in task_list:
logging.info("Executing task")
task.calculate(executor, fft_data)
task.store_data(store_backend, {"tstep": msg.tstep_idx})

toc_tasks = timeit.default_timer()
logging.info(f"Performing analysis and storing took {(toc_tasks - tic_tasks):6.4f}s")
# Step 2) Distribute the work via PoolExecutor
for task in task_list:
task.calculate(executor, fft_data)


#with MPIPoolExecutor(max_workers=256) as executor:
# tic_tasks = timeit.default_timer()
# for task in task_list:
# logging.info("Executing task")
# task.calculate(executor, fft_data)
# task.store_data(store_backend, {"tstep": msg.tstep_idx})
#
# toc_tasks = timeit.default_timer()
# logging.info(f"Performing analysis and storing took {(toc_tasks - tic_tasks):6.4f}s")
Q.task_done()


Expand All @@ -119,7 +127,11 @@ def main():
# Create storage backend
store_backend = backend_numpy("/home/rkube/repos/delta/test_data")

# # Create the task list
# Create a global executor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=30)


# Create the task list
task_list = []
for task_config in cfg["task_list"]:
task_list.append(task_object_dict[task_config["analysis"]](task_config, fft_params, cfg["ECEI_cfg"]))
Expand All @@ -128,8 +140,7 @@ def main():
dq = queue.Queue()
msg = None


worker = threading.Thread(target=consume, args=(dq, store_backend, my_fft, task_list, cfg))
worker = threading.Thread(target=consume, args=(dq, executor, my_fft, task_list))
worker.start()

logging.info(f"Starting main loop")
Expand All @@ -138,7 +149,7 @@ def main():

if stepStatus:
# Read data
stream_data = reader.Get(save=True)
stream_data = reader.Get(save=False)
tb = reader.gen_timebase()

# Generate message id and publish is
Expand All @@ -148,12 +159,12 @@ def main():

if reader.CurrentStep() > 5:
logging.info(f"Exiting: StepStatus={stepStatus}")
dq.put(AdiosMessage(tstep_idx=-1, data=None))
dq.put(AdiosMessage(tstep_idx=None, data=None))
break

worker.join()
dq.join()

executor.shutdown()

if __name__ == "__main__":
main()
Expand Down
26 changes: 18 additions & 8 deletions processor_mpi_nonblocking_mockup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- Encoding: UTF-8 -*-

from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
#from mpi4py.futures import MPIPoolExecutor
import numpy as np
import logging
import threading
Expand All @@ -11,16 +11,24 @@

import attr
import time
import datetime


"""
Author: Ralph Kube
Mockup of Jong's MPI processing model
Mockup of Jong's Pool processing model with the addition of a global futures_list.
The queue consumer does not evaluate futures but appends them to a global futures_list.
The futures in that list are handled one-by-one as the results become available
To run on an interactive node
srun -n 4 python -m mpi4py.futures processor_mpi.py --config configs/test_crossphase.json
using MPIPoolExecutor
srun -n 4 python -m mpi4py.futures processor_mpi_nonblocking_mockup.py --config configs/test_crossphase.json
using concurrent.futures.PoolExecutor:
python processor_mpi_nonblocking_mockup.py
"""

logging.basicConfig(
Expand All @@ -38,9 +46,11 @@ class AdiosMessage:


def calc(param, tidx, data):
time.sleep(np.random.uniform(0.0, 1.0))
wait = np.random.uniform(0.0, 1.0)
tstart = datetime.datetime.now()
time.sleep(wait)
#logging.info(f" calc: tidx {tidx}, param = {param}")#", data = {data}")
return (tidx, param, np.random.rand())
return (tidx, param, tstart, wait)


def consume(Q, executor, futures_list):
Expand Down Expand Up @@ -69,8 +79,8 @@ def main():
data = np.zeros([192, 512, 38], dtype=np.complex128)

# Define an executor that handles the analysis tasks
executor = MPIPoolExecutor(max_workers=2)
#executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
#executor = MPIPoolExecutor(max_workers=2)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=30)

# This stores the list of all futures generated by the poolexecutor
futures_list = []
Expand Down Expand Up @@ -100,7 +110,7 @@ def main():
# Finally, start processing the futures list
for future in concurrent.futures.as_completed(futures_list):
res = future.result()
logging.info(f"Future completed: tidx={res[0]}, param={res[1]}")
logging.info(f"Future completed: tidx={res[0]}, param={res[1]}, started at {res[2]}, wait={res[3]:6.4f}s")

# Shutdown the MPIPoolExecutor
# This has to be done after the queue has joined!
Expand Down

0 comments on commit 02c75ae

Please sign in to comment.