diff --git a/.gitignore b/.gitignore index 9f94e5d..52bb604 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .vscode __pycache__ +*.bp diff --git a/README.md b/README.md index bdb4a91..c7d2d8e 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,23 @@ +DELTA-FUSION aDaptive rEaL Time Analysis of big fusion data -This project implements a client-server model for analysis of streamed data from +This project implements a client-server model for analysis of streaming data from fusion experiments or large-scale simulations. Implemented as part of "Adaptive near-real time net-worked analysis of big fusion data", (FY18). -Start/Stop zookeper and kafka: https://gist.github.com/piatra/0d6f7ad1435fa7aa790a -#!/bin/bash +The current implementation features a data generator and a data processor. +Both are MPI applications. Each generator task reads data from a single group of channels +and writes them via the ADIOS2 dataman interface. +Each processor task reads this data and performs a single analysis routine. +The output of the analysis is stored in a database for later analysis and visualization. -if [ "$#" -ne 1 ]; then - echo "Please supply topic name" - exit 1 -fi -nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 & -sleep 2 -nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 & -sleep 2 +To run the analysis framework run a generator and a processor simultaneously on a node: +srun -n 2 -c 2 --mem=1G --gres=craynetwork:0 python generator_adios2.py +srun -n 2 -c 2 --mem=1G --gres=craynetwork:0 python processor_adios2.py + +For example within an interactive session, using a split terminal (see the screen utility) -bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $1 -bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic parsed diff --git a/analysis/spectral.py b/analysis/spectral.py new file mode 100644 index 0000000..60425fe --- /dev/null +++ b/analysis/spectral.py @@ -0,0 +1,35 @@ +# Coding: UTF-8 -*- + + + +def power_spectrum(data, **kwargs): + """Implements an overlapped segmented averaging of modified periodograms. + Currently scipy.signal.welch + + See + * Originial implementation in kstar fluctana + + * Discussion in + 'Spectrum and spectral density estimation by the Discrete Fourier transform (DFT), + including a comprehensive list of window functions and some new flat-top windows' + G. Heinzel, A. Rudiger and R. Schilling (2002) + + Input: + ====== + data : channel data to be analyzed + **kwargs : keyword arguments to be passed into wrapped function. See documentation of wrapped function. + + + Returns: + ======== + f : ndarray, vector of frequency bins + Pxx : ndarray, vector of power spectral densities + """ + + from scipy.signal import welch + f, Pxx = welch(data, **kwargs) + + return(f, Pxx) + + +# End of file analysis.py \ No newline at end of file diff --git a/backends/mongodb.py b/backends/mongodb.py new file mode 100644 index 0000000..f23a848 --- /dev/null +++ b/backends/mongodb.py @@ -0,0 +1,37 @@ +#Coding: UTF-8 -*- + +from mpi4py import MPI +from pymongo import MongoClient +from bson.binary import Binary + +class mongodb_backend(): + __init__(self, rank, channel_list): + self.rank = rank + self.channel_list = channel_list + + # Connect to mongodb + client = MongoClient("mongodb07.nersc.gov") + + + def store(analysis, result): + """Stores analysis data + + Input: + ====== + channel_list: List of channels + analysis: dictionary, name and parameters for called analysis routine + result: Result of the analysis routine + + + Returns: + ======== + None + """ + + print("Storing data") + + + return None + + +# End of file mongodb.py \ No newline at end of file diff --git a/config.json b/config.json index 388ff90..bab5a49 100644 --- a/config.json +++ b/config.json @@ -1,4 +1,8 @@ {"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431", "shotnr": 18431, - "channels": [2202, 2203], - "analysis": ["foo", "bar"]} \ No newline at end of file + "channel_lists": [[2203, 2204], [2101, 2102]], + "analysis": [{"name" : "power_spectrum", + "config" : {"nperseg": 32, "fs": 1.0}}, + {"name" : "an_honest_mistake", + "config" : {"a": 0, "b": 22}}] +} \ No newline at end of file diff --git a/generators/generator_adios2.py b/generator_adios2.py similarity index 74% rename from generators/generator_adios2.py rename to generator_adios2.py index 975b434..e7a0ffc 100644 --- a/generators/generator_adios2.py +++ b/generator_adios2.py @@ -10,8 +10,8 @@ import json import argparse -from writers import writer_dataman, writer_bpfile -from data_loader import data_loader +from generator.writers import writer_dataman, writer_bpfile +from generator.data_loader import data_loader """ Generates batches of ECEI data. @@ -31,12 +31,14 @@ datapath = cfg["datapath"] shotnr = cfg["shotnr"] -channel_list = cfg["channels"] # Enforce 1:1 mapping of channels and tasks -assert(len(cfg["channels"]) == size) +assert(len(cfg["channel_lists"]) == size) +# Channels this process is reading +my_channel_list = cfg["channel_lists"][rank] +gen_id = 100000 * rank + my_channel_list[0] -print("Rank: {0:d}".format(rank), ", channels: ", channel_list[rank]) +print("Rank: {0:d}".format(rank), ", channel_list: ", my_channel_list, ", id = ", gen_id) # Hard-code the total number of data points data_pts = int(5e6) @@ -47,7 +49,7 @@ # Get a data_loader dl = data_loader(path.join(datapath, "ECEI.018431.LFS.h5"), - channel_list=[channel_list[rank]], + channel_list=my_channel_list, batch_size=1000) # data_arr is a list @@ -56,7 +58,7 @@ data_arr = data_arr.astype(np.float64) _data_arr = 0.0 -writer = writer_bpfile(shotnr, channel_list[rank]) +writer = writer_bpfile(shotnr, gen_id) writer.DefineVariable(data_arr) writer.Open() diff --git a/generators/data_loader.py b/generators/data_loader.py index 1e10324..c686bf4 100644 --- a/generators/data_loader.py +++ b/generators/data_loader.py @@ -37,16 +37,15 @@ def get(self): comm = MPI.COMM_WORLD rank, size = comm.Get_rank(), comm.Get_size() - #print("rank: {0:d} channel {1:d} batch{2:d}".format(rank, self.channel_list[0], self.current_batch)) + print("rank: {0:d} batch: {1:d}".format(rank, self.current_batch), ", channel list: ", self.channel_list) with h5py.File(self.filename, "r", driver='mpio', comm=MPI.COMM_WORLD) as df: - for ch in self.channel_list_hdf5: + print("Accessing channel ", ch) data_list.append(df[ch][self.current_batch * self.batch_size: (self.current_batch + 1) * self.batch_size]) self.current_batch += 1 - df.close() return(data_list) diff --git a/generators/writers.py b/generators/writers.py index 1e199f2..b106688 100644 --- a/generators/writers.py +++ b/generators/writers.py @@ -6,14 +6,14 @@ class writer_base(): - def __init__(self, shotnr, channel): + def __init__(self, shotnr, id): comm = MPI.COMM_WORLD self.rank = comm.Get_rank() self.size = comm.Get_size() #print("writer_base.__init__(): rank = {0:02d}".format(self.rank)) self.shotnr = shotnr - self.channel = channel + self.id = id self.adios = adios2.ADIOS(MPI.COMM_SELF) self.IO = self.adios.DeclareIO("stream_{0:03d}".format(self.rank)) self.writer = None @@ -35,10 +35,10 @@ def DefineVariable(self, data_array): def Open(self): - """Opens a new channel + """Opens a new channel. """ - self.channel_name = "{0:05d}_ch{1:d}.bp".format(self.shotnr, self.channel) + self.channel_name = "{0:05d}_ch{1:06d}.bp".format(self.shotnr, self.id) if self.writer is None: self.writer = self.IO.Open(self.channel_name, adios2.Mode.Write) @@ -65,8 +65,8 @@ def __del__(self): class writer_dataman(writer_base): - def __init__(self, shotnr, channel): - super().__init__(shotnr, channel) + def __init__(self, shotnr, id): + super().__init__(shotnr, id) self.IO.SetEngine("DataMan") dataman_port = 12300 + self.rank transport_params = {"IPAddress": "127.0.0.1", @@ -77,15 +77,9 @@ def __init__(self, shotnr, channel): class writer_bpfile(writer_base): - def __init__(self, shotnr, channel): - super().__init__(shotnr, channel) + def __init__(self, shotnr, id): + super().__init__(shotnr, id) self.IO.SetEngine("BP4") -# # Define variable - -# Open stream -#writer = adios_IO.Open("stream", adios2.Mode.Write) - - # End of file a2_sender.py \ No newline at end of file diff --git a/processors/processor_adios2.py b/processor_adios2.py similarity index 52% rename from processors/processor_adios2.py rename to processor_adios2.py index adfaf14..ea61fd8 100644 --- a/processors/processor_adios2.py +++ b/processor_adios2.py @@ -4,9 +4,14 @@ import numpy as np import adios2 import json -import argparse +import argparse + + +from processors.readers import reader_dataman, reader_bpfile +from analysis.spectral import power_spectrum + +from backends.mongodb import mongo_backend -from readers import reader_dataman, reader_bpfile comm = MPI.COMM_WORLD rank = comm.Get_rank() @@ -21,44 +26,49 @@ df.close() # "Enforce" 1:1 mapping of reader processes on analysis tasks -assert(len(cfg["channels"]) == size) +assert(len(cfg["channel_lists"]) == size) assert(len(cfg["analysis"]) == size) datapath = cfg["datapath"] shotnr = cfg["shotnr"] -channel_list = cfg["channels"] -analysis = cfg["analysis"][rank] - - +my_analysis = cfg["analysis"][rank] +my_channel_list = cfg["channel_lists"][rank] +gen_id = 100000 * rank + my_channel_list[0] +num_channels = len(my_channel_list) +reader = reader_bpfile(shotnr, gen_id) +reader.Open() -reader = reader_bpfile(shotnr, channel_list[rank]) -reader.Open() +backend = mongo_backend(rank, my_channel_list) -print("Starting main loop") +#print("Starting main loop") while(True): stepStatus = reader.BeginStep() - print(stepStatus) + #print(stepStatus) if stepStatus == adios2.StepStatus.OK: #var = dataman_IO.InquireVariable("floats") #shape = var.Shape() #io_array = np.zeros(np.prod(shape), dtype=np.float) #reader.Get(var, io_array, adios2.Mode.Sync) - io_array = reader.get_data("floats") + channel_data = reader.get_data("floats") #currentStep = reader.CurrentStep() reader.EndStep() - print("Step", reader.CurrentStep(), ", io_array = ", io_array) + #print("rank {0:d}: Step".format(rank), reader.CurrentStep(), ", io_array = ", io_array) else: - print("End of stream") + print("rank {0:d}: End of stream".format(rank)) break + # Recover channel data + channel_data = channel_data.reshape((num_channels, channel_data.size // num_channels)) # Perform the analysis - + if(my_analysis["name"] == "power_spectrum"): + analysis_result = power_spectrum(io_array, **my_analysis["config"]) # Store result in database + backend.store(my_analysis, analysis_result) #datamanReader.Close() diff --git a/processors/processor_kafka.py b/processors/processor_kafka.py index abd7fff..a324995 100644 --- a/processors/processor_kafka.py +++ b/processors/processor_kafka.py @@ -1,5 +1,24 @@ #-*- coding: UTF-8 -*- +""" +Below is old information for running the kafka/faust implementation: +Start/Stop zookeper and kafka: https://gist.github.com/piatra/0d6f7ad1435fa7aa790a +#!/bin/bash + +if [ "$#" -ne 1 ]; then + echo "Please supply topic name" + exit 1 +fi + +nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 & +sleep 2 +nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 & +sleep 2 + +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $1 +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic parsed +""" + import faust import numpy as np import pickle diff --git a/processors/readers.py b/processors/readers.py index 3215bc0..05d65fe 100644 --- a/processors/readers.py +++ b/processors/readers.py @@ -6,13 +6,13 @@ class reader_base(): - def __init__(self, shotnr, channel): + def __init__(self, shotnr, id): comm = MPI.COMM_WORLD self.rank = comm.Get_rank() self.size = comm.Get_size() self.shotnr = shotnr - self.channel = channel + self.id = id self.adios = adios2.ADIOS(MPI.COMM_SELF) self.IO = self.adios.DeclareIO("stream_{0:03d}".format(self.rank)) print("reader_base.__init__(): rank = {0:02d}".format(self.rank)) @@ -20,7 +20,7 @@ def __init__(self, shotnr, channel): def Open(self): """Opens a new channel""" - self.channel_name = "{0:05d}_ch{1:d}.bp".format(self.shotnr, self.channel) + self.channel_name = "{0:05d}_ch{1:06d}.bp".format(self.shotnr, self.id) if self.reader is None: self.reader = self.IO.Open(self.channel_name, adios2.Mode.Read) @@ -57,8 +57,8 @@ def EndStep(self): class reader_dataman(reader_base): - def __init__(self, shotnr, channel): - super().__init__(shotnr, channel) + def __init__(self, shotnr, id): + super().__init__(shotnr, id) self.IO.SetEngine("DataMan") self.reader = None @@ -71,12 +71,9 @@ def __init__(self, shotnr, channel): class reader_bpfile(reader_base): - def __init__(self, shotnr, channel): - super().__init__(shotnr, channel) + def __init__(self, shotnr, id): + super().__init__(shotnr, id) self.IO.SetEngine("BPFile") self.reader = None - - - # end of file readers.py \ No newline at end of file