Skip to content

Commit

Permalink
Added new mockup that demonstrated non-blocking evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Jan 22, 2020
1 parent 6b29080 commit 9b2e757
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 20 deletions.
57 changes: 54 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,30 @@ srun -n 2 -c 2 --mem=1G --gres=craynetwork:0 python generator_adios2.py
srun -n 2 -c 2 --mem=1G --gres=craynetwork:0 python processor_adios2.py
```

For example within an interactive session, using a split terminal (see the screen utility)
To have generator and processor side-by-side within an interactive session it is convenient
to [split the terminal using screen]
(https://unix.stackexchange.com/questions/7453/how-to-split-the-terminal-into-more-than-one-view)


# Workflow Scenario #1
It consists of three components:
This is a three-node scenario, where data is streamed from a KSTAR DTN to a NERSC DTN and
subsequently passed to a compute node:

```
generator.py =====> receiver.py =====> analysis_adios2.py (not yet implemented)
(running on KSTAR DTN) | (running on NERSC DTN) | (running on NERSC compute nodes)
v v
stream name: shotnum-channelid.bp shotnum-channelid.s1.bp
```
This scenario is currently not implemented fully.

Example commands are as follows:
```
python generator.py --config config-jychoi.json
python receiver.py --config config-jychoi.json
```

Parameters can be provided with a Jason file. Here is an example:
Parameters can be provided with a json file. Here is an example:
```
{
"datapath": "/home/choij/kstar_streaming/018431",
Expand All @@ -53,3 +59,48 @@ Parameters can be provided with a Jason file. Here is an example:
}
```


# 2-node scenario
Data is streamed from a KSTAR DTN directly into a cori compute node.
```
KSTAR DTN cori compute
generator.py =========> processor_???.py
|
v
ADIOS2 dataman
stream name: KSTAR_shotnum
```
In this scenario, the processor reads a configuration file, performs the analysis routines
defined in that file, and stores the data for subsequent analysis.


As of now, there is no common format for configuration file. Each processor and receiver has its own
format.


### Currently implemented processors

processor_mpi_mockup.py: Implements a naked reference implementation that
* Reads dummy data, mimicking a received data package from KSTAR, in the main loop
* Puts the dummy data in a queue
* A worker thread reads data from the queue and dispatches it to a MPIPoolExecutor for
analysis

Run this implemntation as
```
export OMP_NUM_THREAD=1
srun -n 16 python -m mpi4py.futures processor_mpi_mockup.py --config configs/test_crossphase.json
```

The number of OpenMP threads needs to be small since the linked BLAS libraries from
some numpy packages are inherently multithreaded. This causes the processor to run
with a larger number of tasks than available on the machine.







Empty file removed __init__.py
Empty file.
63 changes: 46 additions & 17 deletions processor_mpi_mockup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,23 @@
import logging
import threading
import queue
import timeit
#import concurrent.futures
#import timeit

import attr
import time


"""
Author: Ralph Kube
Mockup of Jong's MPI processing model
To run on an interactive node
srun -n 4 python -m mpi4py.futures processor_mpi.py --config configs/test_crossphase.json
"""

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
Expand All @@ -26,47 +37,65 @@ class AdiosMessage:
data = attr.ib(repr=False)


def calc2(i, data):
logging.info(f"calc2: i = {i}, data = {data}")

def calc(param, tidx, data):
logging.info(f" calc: tidx {tidx}, param = {param}")#", data = {data}")
time.sleep(np.random.uniform(0.0, 1.0))
return np.random.rand()


def consume(Q):
def consume(Q, executor):
while True:
msg = Q.get()
logging.info(f"Consumed message {msg}, {msg.tstep_idx}")
if msg.tstep_idx == -1:
logging.info(f"Consuming message {msg}")
if msg.tstep_idx == None:
Q.task_done()
break

with MPIPoolExecutor(max_workers=4) as executor:
#for res in executor.map(calc, range(5)):
for res in [executor.submit(calc2, i, msg.data) for i in range(5)]:
logging.info(f"i = {msg.tstep_idx}, res = {res.result()}")
for res in [executor.submit(calc, param, msg.tstep_idx, msg.data) for param in range(5)]:
logging.info(f" tstep = {msg.tstep_idx} res = {res.result()}")

Q.task_done()
logging.info(f"Processed message")
logging.info(f"Done consuming {msg}")


def main():

# Define a data queue. This serves as the connection between the receiver (datastream from KSTAR)
# to the executor that handles the analysis routines
dq = queue.Queue()
msg = AdiosMessage(0, None)
# Dummy data of the same shape as the DFT of a time-chunk.
data = np.zeros([192, 512, 38], dtype=np.complex128)

worker = threading.Thread(target=consume, args=(dq, ))
# Define an executor that handles the analysis tasks
executor = MPIPoolExecutor(max_workers=2)
#executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

# Start a worker thread that pops element from the queue and dispatches it to
# the executor
worker = threading.Thread(target=consume, args=(dq, executor))
worker.start()

# Start the receiver loop. Here we receive data chunks from remote
for i in range(5):
logging.info(f"Time step {i}")
# Receive time chunk data
logging.info(f"Received time chunk {i}")
# Compile a message with the current data
msg = AdiosMessage(tstep_idx=i, data=data)
# Put the data in the queue
dq.put(msg)

dq.put(AdiosMessage(-1, None))

worker.join()
logging.info("Finished the receiver loop")
# Put the hang-up message in the queue
dq.put(AdiosMessage(None, None))
# Close the queue
dq.join()
# Stop the worker process
worker.join()

# Shutdown the MPIPoolExecutor
# This has to be done after the queue has joined!
executor.shutdown()


if __name__ == "__main__":
Expand Down
115 changes: 115 additions & 0 deletions processor_mpi_nonblocking_mockup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# -*- Encoding: UTF-8 -*-

from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
import numpy as np
import logging
import threading
import queue
import concurrent.futures
#import timeit

import attr
import time


"""
Author: Ralph Kube
Mockup of Jong's MPI processing model
To run on an interactive node
srun -n 4 python -m mpi4py.futures processor_mpi.py --config configs/test_crossphase.json
"""

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)


@attr.s
class AdiosMessage:
""" Defines data chunks as read from adios."""
tstep_idx = attr.ib(repr=True)
data = attr.ib(repr=False)


def calc(param, tidx, data):
time.sleep(np.random.uniform(0.0, 1.0))
#logging.info(f" calc: tidx {tidx}, param = {param}")#", data = {data}")
return (tidx, param, np.random.rand())


def consume(Q, executor, futures_list):
while True:
msg = Q.get()
logging.info(f"Consuming message {msg}. Number of futures: {len(futures_list)}")
if msg.tstep_idx == None:
Q.task_done()
break

for future in [executor.submit(calc, param, msg.tstep_idx, msg.data) for param in range(5)]:
#logging.info(f" tstep = {msg.tstep_idx} res = {future.result()}")
futures_list.append(future)

logging.info(f"Done consuming {msg}. Number of futures: {len(futures_list)}")
Q.task_done()


def main():

# Define a data queue. This serves as the connection between the receiver (datastream from KSTAR)
# to the executor that handles the analysis routines
dq = queue.Queue()
msg = AdiosMessage(0, None)
# Dummy data of the same shape as the DFT of a time-chunk.
data = np.zeros([192, 512, 38], dtype=np.complex128)

# Define an executor that handles the analysis tasks
executor = MPIPoolExecutor(max_workers=2)
#executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

# This stores the list of all futures generated by the poolexecutor
futures_list = []

# Start a worker thread that pops element from the queue and dispatches it to
# the executor
worker = threading.Thread(target=consume, args=(dq, executor, futures_list))
worker.start()

# Start the receiver loop. Here we receive data chunks from remote
for i in range(5):
# Receive time chunk data
logging.info(f"Received time chunk {i}")
# Compile a message with the current data
msg = AdiosMessage(tstep_idx=i, data=data)
# Put the data in the queue
dq.put(msg)

logging.info("Finished the receiver loop")
# Put the hang-up message in the queue
dq.put(AdiosMessage(None, None))
# Close the queue
dq.join()
# Stop the worker process
worker.join()

# Finally, start processing the futures list
for future in concurrent.futures.as_completed(futures_list):
res = future.result()
logging.info(f"Future completed: tidx={res[0]}, param={res[1]}")

# Shutdown the MPIPoolExecutor
# This has to be done after the queue has joined!
executor.shutdown()


if __name__ == "__main__":
main()



# End of file mpi_processor_mockup.py

0 comments on commit 9b2e757

Please sign in to comment.