Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Sep 4, 2019
0 parents commit f623d00
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.vscode
__pycache__
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
aDaptive rEaL Time Analysis of big fusion data

This project implements a client-server model for analysis of streamed data from
fusion experiments or large-scale simulations.

Implemented as part of "Adaptive near-real time net-worked analysis of big
fusion data", (FY18).


Start/Stop zookeper and kafka: https://gist.github.com/piatra/0d6f7ad1435fa7aa790a
#!/bin/bash

if [ "$#" -ne 1 ]; then
echo "Please supply topic name"
exit 1
fi

nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties > /dev/null 2>&1 &
sleep 2
nohup bin/kafka-server-start.sh -daemon config/server.properties > /dev/null 2>&1 &
sleep 2

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic $1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic parsed
21 changes: 21 additions & 0 deletions generators/data_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python3
#-*- coding: UTF-8 -*-

"""
This is a dummy data generator to send data through kafka into the channel 'ECEI_data'
"""

from kafka import KafkaProducer
import numpy as np
import pickle


producer = KafkaProducer()#bootstrap_servers='localhost:1234')
for _ in range(100):
#data = np.random.uniform(0.0, 1.0, 23)
data = np.arange(10, 17, 0.0001)
# I'm totally not sure how this will play out. Let's go with it and see :)
producer.send('ECEI_data', pickle.dumps(data))


# End of file data_generator.py
105 changes: 105 additions & 0 deletions generators/generator_hdf5_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# -*- coding: UTF-8 -*-

from kafka import KafkaProducer
import h5py
import pickle
import time


"""
Generates batches of ECEI data.
"""

shotnr = 18431
data_path = "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/"
df_fname = "{0:s}/{1:06d}/ECEI.{1:06d}.HFS.h5".format(data_path, shotnr, shotnr)


# Specify the channel we are streaming
channel_name = "H2202"

# Hard-code the total number of data points
data_pts = int(5e6)
# Hard-code the total number of data batches we send over the channel
batches = int(1e2)
# Calculate the number of data points per data packet
data_per_batch = data_pts // batches

producer = KafkaProducer()

with h5py.File(df_fname, "r") as df:
for bb in range(batches):
dset_name = "/ECEI/ECEI_{0:s}/Voltage".format(channel_name)
data = df[dset_name][bb *data_per_batch:(bb + 1) * data_per_batch]
print(bb, data.shape, type(data))
producer.send(channel_name, pickle.dumps(data))
time.sleep(1e-1)




def read_attrs(df_fname):
"""Read attributes from a KSTAR ECEI hdf5 file. Return attributes as a dictionary
Parameters
----------
df_fname : str
input file name
Returns
-------
attrs : dict
attributes of the HDF5 file
"""

with h5py.File(df_fname, "r") as df:
dset = df["/ECEI"]
attrs = dict(dset.attrs)
attrs["toff"] = attrs["TriggerTime"][0] + 1e-3

try:
attrs["Mode"] = attrs["Mode"].decode()

if attrs["Mode"] == "O":
hn = 1
elif attrs["Mode"] == "X":
hn = 2
except:
attrs["Mode"] = "X"
hn = 2

attrs["SampleRate"][0] = attrs["SampleRate"][0] * 1e3
attrs["TFcurrent"] = attrs["TFcurrent"] * 0.995556

return(attrs)


def gen_timebase(tstart, tend):
"""Generates a time base for ECEI data
Parmeters
---------
tstart : float
start time
tend : float
end time
Returns
-------
"""

# Define some shortcuts
tt = attrs["TriggerTime"]
toff = attrs["toff"]
fs = attrs["SampleRate"][0]





return None



# End of file hdf5_generator.py
22 changes: 22 additions & 0 deletions processors/processor_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#-*- coding: UTF-8 -*-

import faust
import numpy as np
import pickle

import sys
sys.path.append("/global/homes/r/rkube/repos/fluctana")
from specs import fftbins

app = faust.App('processor_v1', broker='kafka://localhost:9092', value_serializer='raw', store="memory://")

delta_topic = app.topic('H2202')

@app.agent(delta_topic)
async def consume(data):
async for obj in data:
d = pickle.loads(obj)
print("received data: ", type(d))


# End of file processor_v1.py

0 comments on commit f623d00

Please sign in to comment.