Skip to content

Commit

Permalink
update on python queue and thread/process pool
Browse files Browse the repository at this point in the history
  • Loading branch information
jychoi-hpc committed Dec 10, 2019
1 parent 2723466 commit 7667ef0
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 6 deletions.
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
DELTA-FUSION
aDaptive rEaL Time Analysis of big fusion data
# DELTA-FUSION (aDaptive rEaL Time Analysis of big fusion data)

This project implements a client-server model for analysis of streaming data from
fusion experiments or large-scale simulations.
Expand All @@ -16,8 +15,41 @@ The output of the analysis is stored in a database for later analysis and visual


To run the analysis framework run a generator and a processor simultaneously on a node:
```
srun -n 2 -c 2 --mem=1G --gres=craynetwork:0 python generator_adios2.py
srun -n 2 -c 2 --mem=1G --gres=craynetwork:0 python processor_adios2.py
```

For example within an interactive session, using a split terminal (see the screen utility)

# Workflow Scenario #1
It consists of three components:
```
generator.py =====> receiver.py =====> analysis_adios2.py (not yet implemented)
(running on KSTAR DTN) | (running on NERSC DTN) | (running on NERSC compute nodes)
v v
stream name: shotnum-channelid.bp shotnum-channelid.s1.bp
```

Example commands are as follows:
```
python generator.py --config config-jychoi.json
python receiver.py --config config-jychoi.json
```

Parameters can be provided with a Jason file. Here is an example:
```
{
"datapath": "/home/choij/kstar_streaming/018431",
"shotnr": 18431,
"channel_lists": [[2203, 2204]],
"analysis": [{"name" : "power_spectrum",
"config" : {"nperseg": 32, "fs": 1.0}}],
"engine": "DataMan",
"params": {"IPAddress": "203.230.120.125",
"OpenTimeoutSecs": "600"},
"nstep": 100,
"analysis_engine": "BP4"
}
```
76 changes: 76 additions & 0 deletions generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# -*- coding: UTF-8 -*-

from mpi4py import MPI
import numpy as np
import time
import adios2

from os import path

import json
import argparse

from generators.writers import writer_dataman, writer_bpfile, writer_sst, writer_gen
from generators.data_loader import data_loader

"""
Generates batches of ECEI data.
"""

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()


parser = argparse.ArgumentParser(description="Send KSTAR data using ADIOS2")
parser.add_argument('--config', type=str, help='Lists the configuration file', default='config.json')
args = parser.parse_args()

with open(args.config, "r") as df:
cfg = json.load(df)

datapath = cfg["datapath"]
shotnr = cfg["shotnr"]
nstep = cfg["nstep"]

# Enforce 1:1 mapping of channels and tasks
assert(len(cfg["channel_lists"]) == size)
# Channels this process is reading
my_channel_list = cfg["channel_lists"][rank]
gen_id = 100000 * rank + my_channel_list[0]

print("Rank: {0:d}".format(rank), ", channel_list: ", my_channel_list, ", id = ", gen_id)

# Hard-code the total number of data points
data_pts = int(5e6)
# Hard-code number of data points per data packet
data_per_batch = int(1e1)
# Calculate the number of required data batches we send over the channel
num_batches = data_pts // data_per_batch

# Get a data_loader
dl = data_loader(path.join(datapath, "ECEI.018431.LFS.h5"),
channel_list=my_channel_list,
batch_size=1000)

# data_arr is a list
_data_arr = dl.get()
data_arr = np.array(_data_arr)
data_arr = data_arr.astype(np.float64)
_data_arr = 0.0

#writer = writer_dataman(shotnr, gen_id)
writer = writer_gen(shotnr, gen_id, cfg["engine"], cfg["params"])

writer.DefineVariable(data_arr)
writer.Open()

for i in range(nstep):
if(rank == 0):
print("Sending: {0:d} / {1:d}".format(i, nstep))
writer.put_data(data_arr)
dl.get()
time.sleep(0.1)

#print("Finished")

23 changes: 19 additions & 4 deletions generators/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import adios2
import numpy as np


class writer_base():
def __init__(self, shotnr, id):
comm = MPI.COMM_WORLD
Expand Down Expand Up @@ -69,17 +68,33 @@ def __init__(self, shotnr, id):
super().__init__(shotnr, id)
self.IO.SetEngine("DataMan")
dataman_port = 12300 + self.rank
transport_params = {"IPAddress": "127.0.0.1",
transport_params = {"IPAddress": "203.230.120.125",
"Port": "{0:5d}".format(dataman_port),
"OpenTimeoutSecs": "600",
"Verbose": "20"}
self.IO.SetParameters(transport_params)


class writer_bpfile(writer_base):
def __init__(self, shotnr, id):
super().__init__(shotnr, id)
self.IO.SetEngine("BP4")

class writer_sst(writer_base):
def __init__(self, shotnr, id):
super().__init__(shotnr, id)
self.IO.SetEngine("SST")
self.IO.SetParameter("OpenTimeoutSecs", "600")

# End of file a2_sender.py
class writer_gen(writer_base):
""" General writer to be initialized by name and parameters
"""
def __init__(self, shotnr, id, engine, params):
super().__init__(shotnr, id)
self.IO.SetEngine(engine)
_params = params
if engine.lower() == "dataman":
dataman_port = 12300 + self.rank
_params.update(Port = "{0:5d}".format(dataman_port))
self.IO.SetParameters(_params)

# End of file a2_sender.py
159 changes: 159 additions & 0 deletions receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#-*- coding: UTF-8 -*-

from mpi4py import MPI
import numpy as np
import adios2
import json
import argparse


from processors.readers import reader_dataman, reader_bpfile, reader_sst, reader_gen
from analysis.spectral import power_spectrum

## jyc: temporarily disabled. Will use later
#from backends.mongodb import mongo_backend

import concurrent.futures
import time
import os
import queue
import threading

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

parser = argparse.ArgumentParser(description="Send KSTAR data using ADIOS2")
parser.add_argument('--config', type=str, help='Lists the configuration file', default='config.json')
args = parser.parse_args()

with open(args.config, "r") as df:
cfg = json.load(df)
df.close()

# "Enforce" 1:1 mapping of reader processes on analysis tasks
assert(len(cfg["channel_lists"]) == size)
assert(len(cfg["analysis"]) == size)

datapath = cfg["datapath"]
shotnr = cfg["shotnr"]
my_analysis = cfg["analysis"][rank]
my_channel_list = cfg["channel_lists"][rank]
gen_id = 100000 * rank + my_channel_list[0]
num_channels = len(my_channel_list)


## jyc: temporarily disabled. Will use later
#backend = mongo_backend(rank, my_channel_list)

#print("Starting main loop")

## jyc:
## This is for testing to demontrate performing analysis as we receive data.
## We run multiple threads (or processes) to perform analysis for the data received from the generator (KSTAR).

## Testing between thread pool or process pool.
## Thread pool would be good for small number of workers and io-bound jobs.
## Processs pool would be good to utilize multiple cores.

#executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
executor = concurrent.futures.ProcessPoolExecutor(max_workers=8)

def perform_analysis(channel_data, step):
"""
Perform analysis
"""
print (">>> analysis ... %d"%step)
t0 = time.time()
if(my_analysis["name"] == "power_spectrum"):
analysis_result = power_spectrum(channel_data, **my_analysis["config"])
t1 = time.time()

# Store result in database
##backend.store(my_analysis, analysis_result)
#time.sleep(5)
print (">>> analysis ... %d: done (%f secs)"%(step, t1-t0))

## Warming up for loading modules
print ("Warming up ... ")
for i in range(8):
channel_data = np.zeros((num_channels, 100), dtype=np.float64)
executor.submit(perform_analysis, channel_data, -1)
time.sleep(10)
print ("Warming up ... done")

## jyc:
## We run a worker thread to save the channel data received from the generator (KSTAR).
## Queue is used between the main process and this worker thread.
dq = queue.Queue()
def save_data():
"""
Save channel data with Adios
"""
fname = "{0:05d}_ch{1:06d}.s1.bp".format(shotnr, gen_id)
with adios2.open(fname, "w", engine_type=cfg["analysis_engine"]) as fh:
while True:
channel_data, step = dq.get()
if channel_data is None:
break
shape = channel_data.shape
offset = [0,]*channel_data.ndim
count = channel_data.shape
fh.write("floats", channel_data, shape, offset, count, end_step=True)
dq.task_done()
print (">>> saving ... %d"%step)

worker = threading.Thread(target=save_data)
worker.start()

#reader = reader_dataman(shotnr, gen_id)
## general reader. engine type and params can be changed with the config file
reader = reader_gen(shotnr, gen_id, cfg["engine"], cfg["params"])
reader.Open()

## jyc:
## main loop:
## Fetching data as soon as possible. Saving with Adios will be done by a thread.

step = 0
while(True):
#for i in range(10):
stepStatus = reader.BeginStep()
#print(stepStatus)
if stepStatus == adios2.StepStatus.OK:
#var = dataman_IO.InquireVariable("floats")
#shape = var.Shape()
#io_array = np.zeros(np.prod(shape), dtype=np.float)
#reader.Get(var, io_array, adios2.Mode.Sync)
channel_data = reader.get_data("floats")
#currentStep = reader.CurrentStep()
reader.EndStep()
#print("rank {0:d}: Step".format(rank), reader.CurrentStep(), ", io_array = ", io_array)
else:
print("rank {0:d}: End of stream".format(rank))
break

# Recover channel data
channel_data = channel_data.reshape((num_channels, channel_data.size // num_channels))

print ("Step begins ... %d"%step)
## jyc: this is just for testing. This is a place to run analysis if we want.
executor.submit(perform_analysis, channel_data, step)

## Save data in a queue so that a workder thead will fetch and save concurrently.
dq.put((channel_data, step))
step += 1

#datamanReader.Close()
## jyc: this is just for testing. We need to close thread/process pool
executor.shutdown(wait=True)

## jyc:
## We are done. Wait the workder thread to finish.
dq.join()
dq.put((None, step))
worker.join()

print (">>> processing ... done.")

# End of file processor_adios2.

0 comments on commit 7667ef0

Please sign in to comment.