Skip to content

Commit

Permalink
Added per-process based data analysis configuration and started a mon…
Browse files Browse the repository at this point in the history
…godb backend
  • Loading branch information
rkube committed Oct 1, 2019
1 parent a6f9172 commit 39fce55
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 64 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.vscode
__pycache__
*.bp
25 changes: 12 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
DELTA-FUSION
aDaptive rEaL Time Analysis of big fusion data

This project implements a client-server model for analysis of streamed data from
This project implements a client-server model for analysis of streaming data from
fusion experiments or large-scale simulations.

Implemented as part of "Adaptive near-real time net-worked analysis of big
fusion data", (FY18).


Start/Stop zookeper and kafka: https://gist.github.com/piatra/0d6f7ad1435fa7aa790a
#!/bin/bash
The current implementation features a data generator and a data processor.
Both are MPI applications. Each generator task reads data from a single group of channels
and writes them via the ADIOS2 dataman interface.
Each processor task reads this data and performs a single analysis routine.
The output of the analysis is stored in a database for later analysis and visualization.

if [ "$#" -ne 1 ]; then
echo "Please supply topic name"
exit 1
fi

nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 &
sleep 2
nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 &
sleep 2
To run the analysis framework run a generator and a processor simultaneously on a node:
srun -n 2 -c 2 --mem=1G --gres=craynetwork:0 python generator_adios2.py
srun -n 2 -c 2 --mem=1G --gres=craynetwork:0 python processor_adios2.py

For example within an interactive session, using a split terminal (see the screen utility)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic parsed
35 changes: 35 additions & 0 deletions analysis/spectral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Coding: UTF-8 -*-



def power_spectrum(data, **kwargs):
"""Implements an overlapped segmented averaging of modified periodograms.
Currently scipy.signal.welch
See
* Originial implementation in kstar fluctana
* Discussion in
'Spectrum and spectral density estimation by the Discrete Fourier transform (DFT),
including a comprehensive list of window functions and some new flat-top windows'
G. Heinzel, A. Rudiger and R. Schilling (2002)
Input:
======
data : channel data to be analyzed
**kwargs : keyword arguments to be passed into wrapped function. See documentation of wrapped function.
Returns:
========
f : ndarray, vector of frequency bins
Pxx : ndarray, vector of power spectral densities
"""

from scipy.signal import welch
f, Pxx = welch(data, **kwargs)

return(f, Pxx)


# End of file analysis.py
37 changes: 37 additions & 0 deletions backends/mongodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#Coding: UTF-8 -*-

from mpi4py import MPI
from pymongo import MongoClient
from bson.binary import Binary

class mongodb_backend():
__init__(self, rank, channel_list):
self.rank = rank
self.channel_list = channel_list

# Connect to mongodb
client = MongoClient("mongodb07.nersc.gov")


def store(analysis, result):
"""Stores analysis data
Input:
======
channel_list: List of channels
analysis: dictionary, name and parameters for called analysis routine
result: Result of the analysis routine
Returns:
========
None
"""

print("Storing data")


return None


# End of file mongodb.py
8 changes: 6 additions & 2 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431",
"shotnr": 18431,
"channels": [2202, 2203],
"analysis": ["foo", "bar"]}
"channel_lists": [[2203, 2204], [2101, 2102]],
"analysis": [{"name" : "power_spectrum",
"config" : {"nperseg": 32, "fs": 1.0}},
{"name" : "an_honest_mistake",
"config" : {"a": 0, "b": 22}}]
}
16 changes: 9 additions & 7 deletions generators/generator_adios2.py → generator_adios2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import json
import argparse

from writers import writer_dataman, writer_bpfile
from data_loader import data_loader
from generator.writers import writer_dataman, writer_bpfile
from generator.data_loader import data_loader

"""
Generates batches of ECEI data.
Expand All @@ -31,12 +31,14 @@

datapath = cfg["datapath"]
shotnr = cfg["shotnr"]
channel_list = cfg["channels"]

# Enforce 1:1 mapping of channels and tasks
assert(len(cfg["channels"]) == size)
assert(len(cfg["channel_lists"]) == size)
# Channels this process is reading
my_channel_list = cfg["channel_lists"][rank]
gen_id = 100000 * rank + my_channel_list[0]

print("Rank: {0:d}".format(rank), ", channels: ", channel_list[rank])
print("Rank: {0:d}".format(rank), ", channel_list: ", my_channel_list, ", id = ", gen_id)

# Hard-code the total number of data points
data_pts = int(5e6)
Expand All @@ -47,7 +49,7 @@

# Get a data_loader
dl = data_loader(path.join(datapath, "ECEI.018431.LFS.h5"),
channel_list=[channel_list[rank]],
channel_list=my_channel_list,
batch_size=1000)

# data_arr is a list
Expand All @@ -56,7 +58,7 @@
data_arr = data_arr.astype(np.float64)
_data_arr = 0.0

writer = writer_bpfile(shotnr, channel_list[rank])
writer = writer_bpfile(shotnr, gen_id)
writer.DefineVariable(data_arr)
writer.Open()

Expand Down
5 changes: 2 additions & 3 deletions generators/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,15 @@ def get(self):
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

#print("rank: {0:d} channel {1:d} batch{2:d}".format(rank, self.channel_list[0], self.current_batch))
print("rank: {0:d} batch: {1:d}".format(rank, self.current_batch), ", channel list: ", self.channel_list)


with h5py.File(self.filename, "r", driver='mpio', comm=MPI.COMM_WORLD) as df:

for ch in self.channel_list_hdf5:
print("Accessing channel ", ch)
data_list.append(df[ch][self.current_batch * self.batch_size:
(self.current_batch + 1) * self.batch_size])
self.current_batch += 1

df.close()

return(data_list)
Expand Down
22 changes: 8 additions & 14 deletions generators/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@


class writer_base():
def __init__(self, shotnr, channel):
def __init__(self, shotnr, id):
comm = MPI.COMM_WORLD
self.rank = comm.Get_rank()
self.size = comm.Get_size()
#print("writer_base.__init__(): rank = {0:02d}".format(self.rank))

self.shotnr = shotnr
self.channel = channel
self.id = id
self.adios = adios2.ADIOS(MPI.COMM_SELF)
self.IO = self.adios.DeclareIO("stream_{0:03d}".format(self.rank))
self.writer = None
Expand All @@ -35,10 +35,10 @@ def DefineVariable(self, data_array):


def Open(self):
"""Opens a new channel
"""Opens a new channel.
"""

self.channel_name = "{0:05d}_ch{1:d}.bp".format(self.shotnr, self.channel)
self.channel_name = "{0:05d}_ch{1:06d}.bp".format(self.shotnr, self.id)

if self.writer is None:
self.writer = self.IO.Open(self.channel_name, adios2.Mode.Write)
Expand All @@ -65,8 +65,8 @@ def __del__(self):


class writer_dataman(writer_base):
def __init__(self, shotnr, channel):
super().__init__(shotnr, channel)
def __init__(self, shotnr, id):
super().__init__(shotnr, id)
self.IO.SetEngine("DataMan")
dataman_port = 12300 + self.rank
transport_params = {"IPAddress": "127.0.0.1",
Expand All @@ -77,15 +77,9 @@ def __init__(self, shotnr, channel):


class writer_bpfile(writer_base):
def __init__(self, shotnr, channel):
super().__init__(shotnr, channel)
def __init__(self, shotnr, id):
super().__init__(shotnr, id)
self.IO.SetEngine("BP4")

# # Define variable

# Open stream
#writer = adios_IO.Open("stream", adios2.Mode.Write)



# End of file a2_sender.py
40 changes: 25 additions & 15 deletions processors/processor_adios2.py → processor_adios2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
import numpy as np
import adios2
import json
import argparse
import argparse


from processors.readers import reader_dataman, reader_bpfile
from analysis.spectral import power_spectrum

from backends.mongodb import mongo_backend

from readers import reader_dataman, reader_bpfile

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
Expand All @@ -21,44 +26,49 @@
df.close()

# "Enforce" 1:1 mapping of reader processes on analysis tasks
assert(len(cfg["channels"]) == size)
assert(len(cfg["channel_lists"]) == size)
assert(len(cfg["analysis"]) == size)

datapath = cfg["datapath"]
shotnr = cfg["shotnr"]
channel_list = cfg["channels"]
analysis = cfg["analysis"][rank]


my_analysis = cfg["analysis"][rank]
my_channel_list = cfg["channel_lists"][rank]
gen_id = 100000 * rank + my_channel_list[0]
num_channels = len(my_channel_list)

reader = reader_bpfile(shotnr, gen_id)
reader.Open()


reader = reader_bpfile(shotnr, channel_list[rank])
reader.Open()
backend = mongo_backend(rank, my_channel_list)

print("Starting main loop")
#print("Starting main loop")

while(True):
stepStatus = reader.BeginStep()
print(stepStatus)
#print(stepStatus)
if stepStatus == adios2.StepStatus.OK:
#var = dataman_IO.InquireVariable("floats")
#shape = var.Shape()
#io_array = np.zeros(np.prod(shape), dtype=np.float)
#reader.Get(var, io_array, adios2.Mode.Sync)
io_array = reader.get_data("floats")
channel_data = reader.get_data("floats")
#currentStep = reader.CurrentStep()
reader.EndStep()
print("Step", reader.CurrentStep(), ", io_array = ", io_array)
#print("rank {0:d}: Step".format(rank), reader.CurrentStep(), ", io_array = ", io_array)
else:
print("End of stream")
print("rank {0:d}: End of stream".format(rank))
break

# Recover channel data
channel_data = channel_data.reshape((num_channels, channel_data.size // num_channels))

# Perform the analysis

if(my_analysis["name"] == "power_spectrum"):
analysis_result = power_spectrum(io_array, **my_analysis["config"])

# Store result in database
backend.store(my_analysis, analysis_result)

#datamanReader.Close()

Expand Down
19 changes: 19 additions & 0 deletions processors/processor_kafka.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
#-*- coding: UTF-8 -*-

"""
Below is old information for running the kafka/faust implementation:
Start/Stop zookeper and kafka: https://gist.github.com/piatra/0d6f7ad1435fa7aa790a
#!/bin/bash
if [ "$#" -ne 1 ]; then
echo "Please supply topic name"
exit 1
fi
nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 &
sleep 2
nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 &
sleep 2
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic parsed
"""

import faust
import numpy as np
import pickle
Expand Down
Loading

0 comments on commit 39fce55

Please sign in to comment.