From f623d008eab4ed5ce07efb898faac4f2181e5dda Mon Sep 17 00:00:00 2001 From: Ralph Kube Date: Wed, 4 Sep 2019 13:13:06 -0700 Subject: [PATCH] Initial commit --- .gitignore | 2 + README.md | 24 ++++++++ generators/data_generator.py | 21 +++++++ generators/generator_hdf5_v1.py | 105 ++++++++++++++++++++++++++++++++ processors/processor_v1.py | 22 +++++++ 5 files changed, 174 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 generators/data_generator.py create mode 100644 generators/generator_hdf5_v1.py create mode 100644 processors/processor_v1.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f94e5d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.vscode +__pycache__ diff --git a/README.md b/README.md new file mode 100644 index 0000000..bdb4a91 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +aDaptive rEaL Time Analysis of big fusion data + +This project implements a client-server model for analysis of streamed data from +fusion experiments or large-scale simulations. + +Implemented as part of "Adaptive near-real time net-worked analysis of big +fusion data", (FY18). + + +Start/Stop zookeper and kafka: https://gist.github.com/piatra/0d6f7ad1435fa7aa790a +#!/bin/bash + +if [ "$#" -ne 1 ]; then + echo "Please supply topic name" + exit 1 +fi + +nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 & +sleep 2 +nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 & +sleep 2 + +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $1 +bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic parsed diff --git a/generators/data_generator.py b/generators/data_generator.py new file mode 100644 index 0000000..928665d --- /dev/null +++ b/generators/data_generator.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python3 +#-*- coding: UTF-8 -*- + +""" +This is a dummy data generator to send data through kafka into the channel 'ECEI_data' +""" + +from kafka import KafkaProducer +import numpy as np +import pickle + + +producer = KafkaProducer()#bootstrap_servers='localhost:1234') +for _ in range(100): + #data = np.random.uniform(0.0, 1.0, 23) + data = np.arange(10, 17, 0.0001) + # I'm totally not sure how this will play out. Let's go with it and see :) + producer.send('ECEI_data', pickle.dumps(data)) + + +# End of file data_generator.py \ No newline at end of file diff --git a/generators/generator_hdf5_v1.py b/generators/generator_hdf5_v1.py new file mode 100644 index 0000000..e66c289 --- /dev/null +++ b/generators/generator_hdf5_v1.py @@ -0,0 +1,105 @@ +# -*- coding: UTF-8 -*- + +from kafka import KafkaProducer +import h5py +import pickle +import time + + +""" +Generates batches of ECEI data. +""" + +shotnr = 18431 +data_path = "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/" +df_fname = "{0:s}/{1:06d}/ECEI.{1:06d}.HFS.h5".format(data_path, shotnr, shotnr) + + +# Specify the channel we are streaming +channel_name = "H2202" + +# Hard-code the total number of data points +data_pts = int(5e6) +# Hard-code the total number of data batches we send over the channel +batches = int(1e2) +# Calculate the number of data points per data packet +data_per_batch = data_pts // batches + +producer = KafkaProducer() + +with h5py.File(df_fname, "r") as df: + for bb in range(batches): + dset_name = "/ECEI/ECEI_{0:s}/Voltage".format(channel_name) + data = df[dset_name][bb *data_per_batch:(bb + 1) * data_per_batch] + print(bb, data.shape, type(data)) + producer.send(channel_name, pickle.dumps(data)) + time.sleep(1e-1) + + + + +def read_attrs(df_fname): + """Read attributes from a KSTAR ECEI hdf5 file. Return attributes as a dictionary + + Parameters + ---------- + df_fname : str + input file name + + Returns + ------- + attrs : dict + attributes of the HDF5 file + """ + + with h5py.File(df_fname, "r") as df: + dset = df["/ECEI"] + attrs = dict(dset.attrs) + attrs["toff"] = attrs["TriggerTime"][0] + 1e-3 + + try: + attrs["Mode"] = attrs["Mode"].decode() + + if attrs["Mode"] == "O": + hn = 1 + elif attrs["Mode"] == "X": + hn = 2 + except: + attrs["Mode"] = "X" + hn = 2 + + attrs["SampleRate"][0] = attrs["SampleRate"][0] * 1e3 + attrs["TFcurrent"] = attrs["TFcurrent"] * 0.995556 + + return(attrs) + + +def gen_timebase(tstart, tend): + """Generates a time base for ECEI data + + Parmeters + --------- + tstart : float + start time + tend : float + end time + + Returns + ------- + + """ + + # Define some shortcuts + tt = attrs["TriggerTime"] + toff = attrs["toff"] + fs = attrs["SampleRate"][0] + + + + + + return None + + + +# End of file hdf5_generator.py \ No newline at end of file diff --git a/processors/processor_v1.py b/processors/processor_v1.py new file mode 100644 index 0000000..73901ff --- /dev/null +++ b/processors/processor_v1.py @@ -0,0 +1,22 @@ +#-*- coding: UTF-8 -*- + +import faust +import numpy as np +import pickle + +import sys +sys.path.append("/global/homes/r/rkube/repos/fluctana") +from specs import fftbins + +app = faust.App('processor_v1', broker='kafka://localhost:9092', value_serializer='raw', store="memory://") + +delta_topic = app.topic('H2202') + +@app.agent(delta_topic) +async def consume(data): + async for obj in data: + d = pickle.loads(obj) + print("received data: ", type(d)) + + +# End of file processor_v1.py \ No newline at end of file