Delta facilitates near real-time streaming analysis of big fusion data on remote HPC resources. It consists of multiple executables that send, receive, and process data on different machines. The picture below gives an overview of Delta.
Here is a diagram of the framework:
In this scenario, data is streamed from a DTN to Cori. The DTN executes generator.py, processor_mpi.py runs on Cori:
generator.py =======> processor_mpi.py
(DTN) | (Cori)
v
stream_name: SSSSS_NN.bp
The processor accesses distributed computing resources through a thread-pool, see PEP-3148. For Cori,
mpi4py's MPICommExecutor is recommended. But other compatible executors should work as well.
processor_mpi.py
implements this flexible workflow. It
- Reads ECEI time chunks from an adios2 source
- Puts the time chunks in a queue
- A worker thread reads time chunks data from the queue, and passes it to Executor for analysis through task_ecei objects.
- Calls the multi-threaded C/cython kernels for data processing
This workflow allows for flexible channel sizes, so we call it the flexible workflow.
Some spectral analysis are implemented as C kernels and interfaced via cython. To compile the C kernels
cd analysis/lib
make
To build the cython interface
cd analysis
CC=cc LDSHARED="cc -shared" python setup.py build_ext --inplace
Run this implementation on cori:
module unload PrgEnv-cray PrgEnv-gnu PrgEnv-intel
module load PrgEnv-gnu
module unload craype-hugepages2M
module unload python
module load python3
module use -a /global/cscratch1/sd/jyc/sw/modulefiles
module load adios2/devel
module load python_delta_comm
export OMP_NUM_THREAD=N
srun -n 6 -c N python processor_mpi.py --config configs/test_all.json
Alternatively, run the 2-node scenario using the task_list processor which uses an extra fft executor:
srun -n 6 -c N python -m mpi4py.futures processor_mpi_tasklist.py --config configs/test_all.json
In this scenario, a middle-man forwards data from the DTN to Cori. This is necessary when the data stream needs to take another hop:
generator.py =======> middle-man.py =====> processor_mpi.py
(DTN) | (Hop) |
v v
stream_name: SSSSS_NN.bp stream_name: SSSSS_NN.bp
In this scenario, data is streamed from a KSTAR Data Transfer Node (DTN) to a NERSC DTN:
generator.py =====> processor_xyz.py
(running on KSTAR DTN) | (running on NERSC DTN)
v
stream name: shotnum-channelid.bp
Processors implement distributed computing in different ways xyz=[mpi, mpi_brute, dask, ...] Here mpi refers to the new mpi implementation with cython kernels, mpi_brute refers to the brute-force adaption of the fluctana routines by wrapping them in mpi and dask refers to an implementation using dask-distributed. As of 2020-02, the dask-distributed implementation is out-dated.
Here is an example configuration file for the generator running on the KSTAR DTN:
{
"datapath": "/home/choij/kstar_streaming/018431",
"shotnr": 18431,
"channel_lists": [[2203, 2204]],
"analysis": [{"name" : "power_spectrum",
"config" : {"nperseg": 32, "fs": 1.0}}],
"engine": "DataMan",
"params": {"IPAddress": "203.230.120.125",
"OpenTimeoutSecs": "600"},
"nstep": 100,
"analysis_engine": "BP4"
}
This reference implmentation shows the feasability of streaming the data from KSTAR to NERSC with high velocity. To run this scenaris log in to the respective DTNS and execute:
python generator.py --config config-jychoi.json
python receiver.py --config config-jychoi.json
Note that the processor is called receiver
Run the generator on the NERSC DTN as
module use -a /global/cscratch1/sd/jyc/dtn/sw/spack/share/spack/modules/linux-centos7-ivybridge
module load openmpi
module load zeromq
module load python py-numpy py-mpi4py py-h5py py-scipy py-matplotlib py-pyyaml
module use -a /global/cscratch1/sd/jyc/dtn/sw/modulefiles
module load adios2
module load python_delta_comm
For the KNL nodes, best performance is with N=8/16 and 24 or 48 MPI ranks.
Data storage is implmented for numpy and mongodb backends. See the configuration files configs/test_all.json. The mongodb backend allows to store data either internally using gridFS or on the filesystem using numpy. Using mongodb with the numpy backend still stores all metadata in mongodb. Using the pure numpy backend stores also the metadata in numpy files.
RMC's implementation of fluctana in the framework
generator_brute.py =====> receiver_brute.py
(running on KSTAR DTN) | (running on NERSC DTN)
v
stream name: shotnum-ch00000.bp
We can run as follows.
First, on a Cori DTN node, run as follows:
module use -a /global/cscratch1/sd/jyc/dtn/sw/spack/share/spack/modules/linux-centos7-ivybridge
module use -a /global/cscratch1/sd/jyc/dtn/sw/modulefiles
module load openmpi
module load zeromq adios2
module load python py-numpy py-mpi4py py-h5py py-pyyaml py-scipy py-matplotlib
mpirun -n 5 python -u -m mpi4py.futures receiver_brute.py --config config-dtn.json
Then, on KSTAR, run as follows:
module use -a /home/choij/sw/spack/share/spack/modules/linux-centos7-haswell
module use -a /home/choij/sw/spack/share/spack/modules/linux-centos7-broadwell
module use -a /home/choij/sw/modulefiles
module load openmpi
module load zeromq adios2
module load python py-numpy py-mpi4py py-h5py py-scipy py-matplotlib
python -u generator_brute.py --config config-kstar.json
Here is config files used in the above:
config-dtn.json
:
{
"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/",
"shotnr": 18431,
"channel_range": ["ECEI_L0101-2408"],
"analysis": [{"name" : "all"}],
"fft_params" : {"nfft": 1000, "window": "hann", "overlap": 0.5, "detrend" :1},
"engine": "DataMan",
"params": { "IPAddress": "203.230.120.125",
"Timeout": "60",
"OneToOneMode": "TRUE",
"OpenTimeoutSecs": "600"},
"nstep": 200,
"batch_size": 10000,
"resultspath": "./",
}
config-kstar.json
:
{
"datapath": "/home/choij/kstar_streaming/",
"shotnr": 18431,
"channel_range": ["ECEI_L0101-2408"],
"analysis": [{"name" : "all"}],
"fft_params" : {"nfft": 1000, "window": "hann", "overlap": 0.5, "detrend" :1},
"engine": "DataMan",
"params": { "IPAddress": "203.230.120.125",
"Timeout": "60",
"OneToOneMode": "TRUE",
"OpenTimeoutSecs": "600"},
"nstep": 200,
"batch_size": 10000,
"resultspath": "./",
}
This scenario adds an additional station, from the NERSC DTNs and the Cori compute nodes. Data streamed to the DTN and then forwarded to the processor running on the compute nodes. This mitigates the low bandwidth available to the compute nodes to the outside.
generator.py =====> receiver.py =====> analysis.py
(running on KSTAR DTN) | (running on NERSC DTN) | (running on NERSC compute nodes)
v v
stream name: shotnum-channelid.bp shotnum-channelid.s1.bp
This scenario is currently not fully.
Visualizers interface to the streaming data analysis through the database. Using the mongodb backend allows for siphoning the analysis as it comes in through change streams: https://docs.mongodb.com/manual/changeStreams/
Let's say processor_mpi.py
is running with runID 1ABCDE
, writing updates into mongodb. By default,
delta writes the analysis results into the collection test_analysis_RUNID, where RUNID is just the 6-char
runID. In a python session you can follow them
cursor = db.test_analysis_1ABCDE.watch()
for change in cursor():
print(change)
print("")
Visualizers are implemented separately, see f.ex. the web-based dashboard: https://github.com/rkube/dashboard_v2