Skip to content

Commit

Permalink
Merge pull request #3 from jychoi-hpc/master
Browse files Browse the repository at this point in the history
Update with MPI and NOMPI version
  • Loading branch information
rkube authored Jan 14, 2020
2 parents 8d42ff1 + 904b86b commit 73e453d
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 12 deletions.
19 changes: 13 additions & 6 deletions analysis.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#-*- coding: UTF-8 -*-

from mpi4py import MPI
import numpy as np
import adios2
import json
import argparse

from processors.readers import reader_dataman, reader_bpfile, reader_sst, reader_gen
from analysis.spectral import power_spectrum

## jyc: temporarily disabled. Will use later
Expand All @@ -18,14 +16,23 @@
import queue
import threading

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

parser = argparse.ArgumentParser(description="Perform analysis")
parser.add_argument('--config', type=str, help='Lists the configuration file', default='config.json')
parser.add_argument('--nompi', help='Use with nompi', action='store_true')
args = parser.parse_args()

if not args.nompi:
from processors.readers import reader_dataman, reader_bpfile, reader_sst, reader_gen
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
else:
from processors.readers_nompi import reader_dataman, reader_bpfile, reader_sst, reader_gen
comm = None
rank = 0
size = 1

with open(args.config, "r") as df:
cfg = json.load(df)
df.close()
Expand Down
107 changes: 107 additions & 0 deletions processors/readers_nompi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#-*- coding: UTF-8 -*-

#from mpi4py import MPI
import adios2
import numpy as np


class reader_base():
def __init__(self, shotnr, id):
#comm = MPI.COMM_WORLD
self.rank = 0
self.size = 1

self.shotnr = shotnr
self.id = id
self.adios = adios2.ADIOS()
self.IO = self.adios.DeclareIO("stream_{0:03d}".format(self.rank))
print("reader_base.__init__(): rank = {0:02d}".format(self.rank))


def Open(self, worker_id=None):
"""Opens a new channel"""
if worker_id is None:
self.channel_name = "{0:05d}_ch{1:06d}.bp".format(self.shotnr, self.id)
else:
self.channel_name = "{0:05d}_ch{1:06d}.s{2:02d}.bp".format(self.shotnr, self.id, worker_id)
print (">>> Opening ... %s"%(self.channel_name))

if self.reader is None:
self.reader = self.IO.Open(self.channel_name, adios2.Mode.Read)


def BeginStep(self):
"""wrapper for reader.BeginStep()"""
return self.reader.BeginStep()


def InquireVariable(self, varname):
"""Wrapper for IO.InquireVariable"""
return self.IO.InquireVariable(varname)


def get_data(self, varname):
"""Attempt to load `varname` from the opened stream"""

var = self.IO.InquireVariable(varname)
io_array = np.zeros(np.prod(var.Shape()), dtype=np.float)
self.reader.Get(var, io_array, adios2.Mode.Sync)

return(io_array)


def CurrentStep(self):
"""Wrapper for IO.CurrentStep()"""
return self.reader.CurrentStep()


def EndStep(self):
"""Wrapper for reader.EndStep()"""
self.reader.EndStep()


class reader_dataman(reader_base):
def __init__(self, shotnr, id):
super().__init__(shotnr, id)
self.IO.SetEngine("DataMan")
self.reader = None

dataman_port = 12300 + self.rank
transport_params = {"IPAddress": "203.230.120.125",
"Port": "{0:5d}".format(dataman_port),
"OpenTimeoutSecs": "600",
"AlwaysProvideLatestTimestep": "true"}
self.IO.SetParameters(transport_params)
print (">>> reader_dataman ... ")


class reader_bpfile(reader_base):
def __init__(self, shotnr, id):
super().__init__(shotnr, id)
self.IO.SetEngine("BP4")
self.IO.SetParameter("OpenTimeoutSecs", "600")
self.reader = None
print (">>> reader_bpfile ... ")

class reader_sst(reader_base):
def __init__(self, shotnr, id):
super().__init__(shotnr, id)
self.IO.SetEngine("SST")
self.IO.SetParameters({"OpenTimeoutSecs": "600.0"})
self.reader = None
print (">>> reader_sst ... ")

class reader_gen(reader_base):
""" General reader to be initialized by name and parameters
"""
def __init__(self, shotnr, id, engine, params):
super().__init__(shotnr, id)
self.IO.SetEngine(engine)
_params = params
if engine.lower() == "dataman":
dataman_port = 12300 + self.rank
_params.update(Port = "{0:5d}".format(dataman_port))
self.IO.SetParameters(_params)
self.reader = None

# end of file readers.py
19 changes: 13 additions & 6 deletions receiver.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#-*- coding: UTF-8 -*-

from mpi4py import MPI
import numpy as np
import adios2
import json
import argparse

from processors.readers import reader_dataman, reader_bpfile, reader_sst, reader_gen
from analysis.spectral import power_spectrum

import concurrent.futures
Expand All @@ -15,14 +13,23 @@
import queue
import threading

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

parser = argparse.ArgumentParser(description="Receive KSTAR data using ADIOS2")
parser.add_argument('--config', type=str, help='Lists the configuration file', default='config.json')
parser.add_argument('--nompi', help='Use with nompi', action='store_true')
args = parser.parse_args()

if not args.nompi:
from processors.readers import reader_dataman, reader_bpfile, reader_sst, reader_gen
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
else:
from processors.readers_nompi import reader_dataman, reader_bpfile, reader_sst, reader_gen
comm = None
rank = 0
size = 1

with open(args.config, "r") as df:
cfg = json.load(df)
df.close()
Expand Down
132 changes: 132 additions & 0 deletions receiver_mpi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#-*- coding: UTF-8 -*-
# Example command: mpirun -n 8 python -u -m mpi4py.futures receiver_mpi.py --config config.json
import numpy as np
import adios2
import json
import argparse

from analysis.spectral import power_spectrum

import concurrent.futures
import time
import os
import queue
import threading

from mpi4py import MPI
from mpi4py.futures import MPICommExecutor
import sys

parser = argparse.ArgumentParser(description="Receive KSTAR data using ADIOS2")
parser.add_argument('--config', type=str, help='Lists the configuration file', default='config.json')
parser.add_argument('--nompi', help='Use with nompi', action='store_true')
## A trick to handle: python -u -m mpi4py.futures ...
idx = len(sys.argv) - sys.argv[::-1].index(__file__)
args = parser.parse_args(sys.argv[idx:])

if not args.nompi:
from processors.readers import reader_dataman, reader_bpfile, reader_sst, reader_gen
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
else:
from processors.readers_nompi import reader_dataman, reader_bpfile, reader_sst, reader_gen
comm = None
rank = 0
size = 1

with open(args.config, "r") as df:
cfg = json.load(df)
df.close()

datapath = cfg["datapath"]
shotnr = cfg["shotnr"]
my_analysis = cfg["analysis"][0]
my_channel_list = cfg["channel_range"][0]
gen_id = 100000 * 0 + my_channel_list[0]
num_channels = len(my_channel_list)

# Function for workers to perform, which is an analysis.
# Workers (non-master MPI workers) will process each chunk of data.
# mpi4py will be responsible for data distribution.
def perform_analysis(channel_data, step):
"""
Perform analysis
"""
print(">>> ({0:d}) Worker: do analysis step={1:d}".format(rank, step))
t0 = time.time()
if(my_analysis["name"] == "power_spectrum"):
analysis_result = power_spectrum(channel_data, **my_analysis["config"])
t1 = time.time()

# Store result in database
# backend.store(my_analysis, analysis_result)
time.sleep(10)
print(">>> ({0:d}) Worker: done with analysis step={1:d} ({2:f} secs)".format(rank, step, t1-t0))

# Function for a helper thead (dispatcher).
# The dispatcher will dispatch data in the queue (dq) and
# distribute to other workers (non-master MPI workers) with mpi4py's MPICommExecutor.
def dispatch():
while True:
channel_data, step = dq.get()
print(">>> ({0:d}) Dispatcher: read data step={1:d}".format(rank, step))
if channel_data is None:
break
shape = channel_data.shape
offset = [0,]*channel_data.ndim
count = channel_data.shape
future = executor.submit(perform_analysis, channel_data, step)
dq.task_done()

# Main
if __name__ == "__main__":
with MPICommExecutor(MPI.COMM_WORLD, root=0) as executor:
if executor is not None:
# Only master will execute the following block
# Use of "__main__" is critical

# The master thread will keep reading data, while
# a helper thread (dispatcher) will dispatch jobs in the queue (dq) asynchronously
# and distribute jobs to other workers.
# The main idea is not to slow down the master.
dq = queue.Queue()
dispatcher = threading.Thread(target=dispatch)
dispatcher.start()

# Only the master thread will open a data stream.
# General reader: engine type and params can be changed with the config file
reader = reader_gen(shotnr, gen_id, cfg["engine"], cfg["params"])
reader.Open()

# Main loop is here
# Reading data (from KSTAR) and save in the queue (dq) as soon as possible.
# Dispatcher (a helper thread) will asynchronously fetch data in the queue and distribute to other workers.
while(True):
stepStatus = reader.BeginStep()
if stepStatus == adios2.StepStatus.OK:
channel_data = reader.get_data("floats")
currentStep = reader.CurrentStep()
reader.EndStep()
#print("rank {0:d}: Step".format(rank), reader.CurrentStep(), ", io_array = ", io_array)
else:
print(">>> ({0:d}) Receiver: end of stream".format(rank))
break

# Recover channel data
channel_data = channel_data.reshape((num_channels, channel_data.size // num_channels))
print(">>> ({0:d}) Receiver: received data step={1:d}".format(rank, currentStep))

# Save data in a queue then go back to work
# Dispatcher (a helper thread) will fetch asynchronously.
dq.put((channel_data, currentStep))
time.sleep(1)

## Clean up
dq.join()
dq.put((None, -1))
dispatcher.join()
print(">>> ({0:d}) Receiver: done.".format(rank))

# End of file processor_adios2.

0 comments on commit 73e453d

Please sign in to comment.