Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doggo #45

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions delta/configs/logger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ formatters:
extended:
format: "%(levelname)s %(asctime)s,%(msecs)d [Process %(process)d %(processName)s %(threadName)s] [%(module)s %(funcName)s]: %(message)s "
ext_middle:
format: "middleman: %(levelname)s %(asctime)s,%(msecs)d [Process %(process)d %(processName)s %(threadName)s] [%(module)s %(funcName)s]: %(message)s "
format: "%(levelname)s %(asctime)s,%(msecs)d [Process %(process)d %(processName)s %(threadName)s] [%(module)s %(funcName)s]: %(message)s "
ext_generator:
format: "generator: %(levelname)s %(asctime)s,%(msecs)d [Process %(process)d %(processName)s %(threadName)s] [%(module)s %(funcName)s]: %(message)s "
format: "%(levelname)s %(asctime)s,%(msecs)d [Process %(process)d %(processName)s %(threadName)s] [%(module)s %(funcName)s]: %(message)s "


handlers:
Expand All @@ -27,13 +27,13 @@ handlers:
class: logging.FileHandler
level: INFO
formatter: ext_middle
filename: delta.log
filename: delta_middleman.log

file_generator:
class: logging.FileHandler
level: INFO
formatter: ext_generator
filename: delta.log
filename: delta_generator.log

file_performance:
class: logging.FileHandler
Expand Down
73 changes: 73 additions & 0 deletions delta/configs/shot18431_GT_null.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
{
"diagnostic":
{
"name": "kstarecei",
"shotnr": 18341,
"dev": "G",
"datasource":
{
"source_file": "/expdata2/exp_2017/018431/ECEI.018431.GFS.h5",
"chunk_size": 10000,
"num_chunks": 500,
"channel_range": ["0101-2408"],
"datatype": "float",
"t_norm": [-0.099, -0.089]
}
},
"transport_rx":
{
"engine": "dataman",
"params":
{
"IPAddress": "203.230.120.125",
"Timeout": "120",
"Port": "50001",
"TransportMode": "reliable"
}
},
"transport_tx":
{
"engine": "dataman",
"params_sst":
{
"RegistrationMethod": "File"
},
"params":
{
"IPAddress": "128.55.205.18",
"Timeout": "120",
"Port": "50055",
"TransportMode": "reliable"
}
},
"storage":
{
"backend": "null",
"datastore": "gridfs"
},
"preprocess": {
"no_bandpass_fir": {"N": 5, "Wn": [0.14, 0.2], "btype": "bandpass", "output": "sos"},
"no_wavelet": {"wavelet": "db5", "method": "BayesShrink", "wavelet_levels": 5, "rescale_sigma": false},
"no_plot": {"time_range": [2.7175, 2.7178], "plot_dir": "/global/homes/r/rkube/delta_runs/plots/"},
"stft": {"nfft": 512, "fs": 500000, "window": "hann", "overlap": 0.5, "noverlap": 256, "detrend": "constant", "full": true}
},
"analysis": {
"spectral_GAP": {
"channel_chunk_size": 32768,
"ref_channels": [1, 1, 24, 8],
"cmp_channels": [1, 1, 24, 8]
},
"crosscorr_cu": {
"channel_chunk_size": 32768,
"ref_channels": [1, 1, 24, 8],
"cmp_channels": [1, 1, 24, 8]
}
},
"analysis_null": {
"null": {
"channel_chunk_size": 32768,
"ref_channels": [1, 1, 24, 8],
"cmp_channels": [1, 1, 24, 8]
}
}
}
64 changes: 64 additions & 0 deletions delta/configs/test_doggo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"diagnostic":
{
"name": "doggo",
"shotnr": 12,
"datasource":
{
"data_dir":
"/global/cscratch1/sd/rkube/ML_datasets/stanford_dogs/Images",
"img_res_x": 224,
"img_res_y": 224,
"num_img": 10,
"num_categories": 3
}
},
"transport_rx":
{
"engine": "BP4",
"params":
{
"IPAddress": "203.230.120.125",
"Timeout": "120",
"Port": "50001",
"TransportMode": "fast"
}
},
"transport_tx":
{
"engine": "BP4",
"params":
{
"IPAddress": "128.55.205.18",
"Timeout": "120",
"Port": "50055",
"TransportMode": "fast"
}
},
"storage":
{
"backend": "null",
"datastore": "gridfs"
},
"preprocess": {
"bandpass_fir": {"N": 5, "Wn": [0.14, 0.2], "btype": "bandpass", "output": "sos"},
"no_wavelet": {"wavelet": "db5", "method": "BayesShrink", "wavelet_levels": 5, "rescale_sigma": false},
"no_plot": {"time_range": [2.7175, 2.7178], "plot_dir": "/global/homes/r/rkube/delta_runs/plots/"},
"nostft": {"nfft": 512, "fs": 500000, "window": "hann", "overlap": 0.5, "noverlap": 256, "detrend": "constant", "full": true}
},
"analysis_gpu": {
"spectral_GAP": {
"channel_chunk_size": 32768,
"ref_channels": [1, 1, 24, 8],
"cmp_channels": [1, 1, 24, 8]
}
},
"analysis": {
"null": {
"channel_chunk_size": 32768,
"ref_channels": [1, 1, 24, 8],
"cmp_channels": [1, 1, 24, 8]
}
}
}

46 changes: 46 additions & 0 deletions delta/data_models/doggo_diag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- Encoding: UTF-8 -*-

"""Defines helper function for doggo diagnostic.

This is a dummy diagnostic that servers as a proto-type for other
imaging diagnostics such as KSTAR ECEi or KSTAR IRTV.
"""

# import logging


class doggo_chunk():
"""Class that represents a chunk of dog images.

This class provides the following interface.

Creating a time-chunk:

.. code-block:: python

chunk = doggo_chunk(images, metadata)

"""
def __init__(self, data, metadata=None):
"""Creates a doggo_chunk from a given dataset.

Args:
data(ndarray, float):
Image tensor of shape (nimages, width, height, channels)
metadata:
Some meta-data.
"""
self.doggo_data = data
self.metadata = metadata

@property
def data(self):
"""Common interface to data."""
return self.doggo_data

@property
def shape(self):
"""Forwards to self.doggo_data.shape."""
return self.doggo_data.shape

# End of file doggo_diag.py
15 changes: 7 additions & 8 deletions delta/data_models/kstar_ecei.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

class ecei_chunk():
"""Class that represents a time-chunk of ECEI data.

This class provides the following interface.

Creating a time-chunk from streaming data, where `tb_chunk` is of type
:py:class:`data_models.timebase.timebase_streaming`:

.. code-block:: python

chunk = kstar_ecei(stream_data, tb_chunk, stream_attrs)
chunk = ecei_chunk(stream_data, tb_chunk, stream_attrs)

Example: Fourier-transformation of a time-chunk. Time-axis is given given by
axis_t member, data is access by data member, sampling frequency is calculated
Expand All @@ -33,7 +33,7 @@ class ecei_chunk():
.. code-block:: python

fft_data = fft(chunk.data, axis=chunk.axsi_t, fsample = 1. / chunk.tb.dt()

"""

def __init__(self, data, tb, params=None, num_v=24, num_h=8):
Expand All @@ -47,7 +47,7 @@ def __init__(self, data, tb, params=None, num_v=24, num_h=8):
Keys need to include:

* dev: String identifier of the ECEI device: L, G, H, GT, or GR
* TriggerTime:
* TriggerTime:
* t_norm: Vector of 2 floats, defining the time interval used for normalization. In seconds.
* SampleRate: Rate at which each channels samples the plasma. In Hz.
* TFCurrent: Toroidal Field Coil current, in Amps
Expand Down Expand Up @@ -92,7 +92,6 @@ def __init__(self, data, tb, params=None, num_v=24, num_h=8):
assert(data.shape[self.axis_ch] == self.num_h * self.num_v)

# Parameters for the ECEI chunk:
#
self.params = params
# True if data is normalized, False if not.
self.is_normalized = False
Expand Down Expand Up @@ -227,10 +226,10 @@ def channel_range_from_str(range_str):
channels are referred to f.ex.

.. code-block::

'L0101' or 'GT1507'
The letters refer to a device (D), the first two digits to the vertical channel number (V)

The letters refer to a device (D), the first two digits to the vertical channel number (V)
and the last two digits refer to the horizontal channel number (H). Delta uses the same DDVVHH
format.

Expand Down
18 changes: 10 additions & 8 deletions delta/middleman.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

"""Receives data from generator and forwards them to processor."""

from mpi4py import MPI
#from mpi4py import MPI

import logging
import logging.config
Expand All @@ -14,8 +14,8 @@
import yaml
import argparse

from streaming.writers import writer_gen
from streaming.reader_mpi import reader_gen
from streaming.writer_nompi import writer_gen
from streaming.reader_nompi import reader_gen
from data_models.helpers import gen_channel_name, gen_var_name


Expand All @@ -29,7 +29,8 @@ class AdiosMessage:

def forward(Q, cfg, args, timeout):
"""To be executed by a local thread. Pops items from the queue and forwards them."""
global comm, rank
#global comm, rank
rank = 0
logger = logging.getLogger("middleman")
logger.info(f"Worker: Creating writer_gen: engine={cfg[args.transport_tx]['engine']}")

Expand Down Expand Up @@ -74,9 +75,10 @@ def forward(Q, cfg, args, timeout):

def main():
"""Reads items from a ADIOS2 connection and forwards them."""
global comm, rank, args
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
#global comm, rank, args
#comm = MPI.COMM_WORLD
#rank = comm.Get_rank()
rank = 0

parser = argparse.ArgumentParser(description="Receive data and dispatch" +
"analysis tasks to a mpi queue")
Expand All @@ -90,7 +92,7 @@ def main():

with open(args.config, "r") as df:
cfg = json.load(df)
timeout = 5
timeout = 60

# The middleman uses both a reader and a writer. Each is configured with using
# their respective section of the config file. Therefore some keys are duplicated,
Expand Down
16 changes: 8 additions & 8 deletions delta/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def consume(Q, my_task_list, my_preprocessor):

while True:
try:
msg = Q.get(timeout=2.0)
msg = Q.get(timeout=120.0)
except queue.Empty:
logger.info("Empty queue after waiting until time-out. Exiting")
break
Expand All @@ -60,7 +60,7 @@ def consume(Q, my_task_list, my_preprocessor):

def main():
"""Procesess a stream of data chunks on an executor."""
comm = MPI.COMM_WORLD
#comm = MPI.COMM_WORLD

# Parse command line arguments and read configuration file
parser = argparse.ArgumentParser(description="Receive data and dispatch analysis" +
Expand Down Expand Up @@ -112,17 +112,17 @@ def main():
store_backend = store_type(cfg["storage"])
store_backend.store_one({"run_id": cfg['run_id'], "run_config": cfg})

# TODO: (RMC) Should this be moved to where cfg updated?
# TODO: (RMC) Should this be moved to where cfg updated?
# (would allow updating channels to process remotely)
reader = reader_gen(cfg[args.transport], gen_channel_name(cfg["diagnostic"]))
reader.Open()

dq = queue.Queue()

# In a streaming setting, (SST, dataman) attributes can only be accessed after
# reading the first time step of a variable.
# reading the first time step of a variable.
# Initialize stream_attrs with None and load it in the main loop below.
stream_attrs = None
stream_attrs = None

data_model_gen = data_model_generator(cfg["diagnostic"])
my_preprocessor = preprocessor(executor_pre, cfg)
Expand Down Expand Up @@ -158,9 +158,9 @@ def main():
else:
logger.info(f"Exiting: StepStatus={stepStatus}")
break
# if reader.CurrentStep() > 5:
# break

if reader.CurrentStep() > 100:
break

dq.join()
logger.info("Queue joined")
Expand Down
Loading