Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doggo #45

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
mock-up of doggo diagnostic + basic unit tests
  • Loading branch information
rkube committed Oct 7, 2021
commit 587450139e50b07ed2f754dd352c8e7734cae866
64 changes: 64 additions & 0 deletions delta/configs/test_doggo.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"diagnostic":
{
"name": "doggo",
"shotnr": 12,
"datasource":
{
"data_dir":
"/global/cscratch1/sd/rkube/ML_datasets/stanford_dogs/Images",
"img_res_x": 224,
"img_res_y": 224,
"num_img": 10,
"num_categories": 3
}
},
"transport_rx":
{
"engine": "BP4",
"params":
{
"IPAddress": "203.230.120.125",
"Timeout": "120",
"Port": "50001",
"TransportMode": "fast"
}
},
"transport_tx":
{
"engine": "BP4",
"params":
{
"IPAddress": "128.55.205.18",
"Timeout": "120",
"Port": "50055",
"TransportMode": "fast"
}
},
"storage":
{
"backend": "null",
"datastore": "gridfs"
},
"preprocess": {
"bandpass_fir": {"N": 5, "Wn": [0.14, 0.2], "btype": "bandpass", "output": "sos"},
"no_wavelet": {"wavelet": "db5", "method": "BayesShrink", "wavelet_levels": 5, "rescale_sigma": false},
"no_plot": {"time_range": [2.7175, 2.7178], "plot_dir": "/global/homes/r/rkube/delta_runs/plots/"},
"nostft": {"nfft": 512, "fs": 500000, "window": "hann", "overlap": 0.5, "noverlap": 256, "detrend": "constant", "full": true}
},
"analysis_gpu": {
"spectral_GAP": {
"channel_chunk_size": 32768,
"ref_channels": [1, 1, 24, 8],
"cmp_channels": [1, 1, 24, 8]
}
},
"analysis": {
"null": {
"channel_chunk_size": 32768,
"ref_channels": [1, 1, 24, 8],
"cmp_channels": [1, 1, 24, 8]
}
}
}

46 changes: 46 additions & 0 deletions delta/data_models/doggo_diag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- Encoding: UTF-8 -*-

"""Defines helper function for doggo diagnostic.

This is a dummy diagnostic that servers as a proto-type for other
imaging diagnostics such as KSTAR ECEi or KSTAR IRTV.
"""

# import logging


class doggo_chunk():
"""Class that represents a chunk of dog images.

This class provides the following interface.

Creating a time-chunk:

.. code-block:: python

chunk = doggo_chunk(images, metadata)

"""
def __init__(self, data, metadata=None):
"""Creates a doggo_chunk from a given dataset.

Args:
data(ndarray, float):
Image tensor of shape (nimages, width, height, channels)
metadata:
Some meta-data.
"""
self.doggo_data = data
self.metadata = metadata

@property
def data(self):
"""Common interface to data."""
return self.doggo_data

@property
def shape(self):
"""Forwards to self.doggo_data.shape."""
return self.doggo_data.shape

# End of file doggo_diag.py
15 changes: 7 additions & 8 deletions delta/data_models/kstar_ecei.py
Original file line number Diff line number Diff line change
@@ -16,15 +16,15 @@

class ecei_chunk():
"""Class that represents a time-chunk of ECEI data.

This class provides the following interface.

Creating a time-chunk from streaming data, where `tb_chunk` is of type
:py:class:`data_models.timebase.timebase_streaming`:

.. code-block:: python

chunk = kstar_ecei(stream_data, tb_chunk, stream_attrs)
chunk = ecei_chunk(stream_data, tb_chunk, stream_attrs)

Example: Fourier-transformation of a time-chunk. Time-axis is given given by
axis_t member, data is access by data member, sampling frequency is calculated
@@ -33,7 +33,7 @@ class ecei_chunk():
.. code-block:: python

fft_data = fft(chunk.data, axis=chunk.axsi_t, fsample = 1. / chunk.tb.dt()

"""

def __init__(self, data, tb, params=None, num_v=24, num_h=8):
@@ -47,7 +47,7 @@ def __init__(self, data, tb, params=None, num_v=24, num_h=8):
Keys need to include:

* dev: String identifier of the ECEI device: L, G, H, GT, or GR
* TriggerTime:
* TriggerTime:
* t_norm: Vector of 2 floats, defining the time interval used for normalization. In seconds.
* SampleRate: Rate at which each channels samples the plasma. In Hz.
* TFCurrent: Toroidal Field Coil current, in Amps
@@ -92,7 +92,6 @@ def __init__(self, data, tb, params=None, num_v=24, num_h=8):
assert(data.shape[self.axis_ch] == self.num_h * self.num_v)

# Parameters for the ECEI chunk:
#
self.params = params
# True if data is normalized, False if not.
self.is_normalized = False
@@ -227,10 +226,10 @@ def channel_range_from_str(range_str):
channels are referred to f.ex.

.. code-block::

'L0101' or 'GT1507'
The letters refer to a device (D), the first two digits to the vertical channel number (V)

The letters refer to a device (D), the first two digits to the vertical channel number (V)
and the last two digits refer to the horizontal channel number (H). Delta uses the same DDVVHH
format.

10 changes: 5 additions & 5 deletions delta/processor.py
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ def consume(Q, my_task_list, my_preprocessor):

def main():
"""Procesess a stream of data chunks on an executor."""
comm = MPI.COMM_WORLD
#comm = MPI.COMM_WORLD

# Parse command line arguments and read configuration file
parser = argparse.ArgumentParser(description="Receive data and dispatch analysis" +
@@ -112,17 +112,17 @@ def main():
store_backend = store_type(cfg["storage"])
store_backend.store_one({"run_id": cfg['run_id'], "run_config": cfg})

# TODO: (RMC) Should this be moved to where cfg updated?
# TODO: (RMC) Should this be moved to where cfg updated?
# (would allow updating channels to process remotely)
reader = reader_gen(cfg[args.transport], gen_channel_name(cfg["diagnostic"]))
reader.Open()

dq = queue.Queue()

# In a streaming setting, (SST, dataman) attributes can only be accessed after
# reading the first time step of a variable.
# reading the first time step of a variable.
# Initialize stream_attrs with None and load it in the main loop below.
stream_attrs = None
stream_attrs = None

data_model_gen = data_model_generator(cfg["diagnostic"])
my_preprocessor = preprocessor(executor_pre, cfg)
@@ -158,7 +158,7 @@ def main():
else:
logger.info(f"Exiting: StepStatus={stepStatus}")
break

if reader.CurrentStep() > 100:
break

92 changes: 92 additions & 0 deletions delta/sources/loader_doggo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# -*- Encoding: UTF-8 -*-

"""Loader for doggodiag."""

from os.path import join, isfile
from os import listdir, scandir
import numpy as np
#import logging

from skimage import io
from skimage import transform

from data_models.doggo_diag import doggo_chunk


class loader_doggo():
"""Loads dog pictures."""

def __init__(self, cfg_all):
"""Initializes Doggo dataloader.

Args:
cfg_all: (dict)
Global Delta configuration

Returns:
None

Used keys from cfg_all:
* diagnostic.datasource.data_dir
* diagnostic.datasource.img_res_x
* diagnostic.datasource.img_res_y

Glossary example... :term: `foobar.a2`

"""
self.data_dir = cfg_all["diagnostic"]["datasource"]["data_dir"]
self.img_res_x = cfg_all["diagnostic"]["datasource"]["img_res_x"]
self.img_res_y = cfg_all["diagnostic"]["datasource"]["img_res_y"]
self.num_img = cfg_all["diagnostic"]["datasource"]["num_img"]
self.num_categories = cfg_all["diagnostic"]["datasource"]["num_categories"]
# Get a list of all sub-directories
self.subdir_list = [f.path for f in scandir(self.data_dir) if f.is_dir()]
# This 70 is hardcoded, assuming that datadir is
# "/global/cscratch1/sd/rkube/ML_datasets/stanford_dogs/Images/n02108000-"
self.category_list = [x[70:] for x in self.subdir_list]
# Iterate over all categories and build a tensor of all images in there
self.cache()
self.is_cached = True

def cache(self):
"""Loads all images into numpy arrays.

Returns:
None
"""
self.image_tensors = []
# Iterate over all categories
for subdir in self.subdir_list[:self.num_categories]:
print("Scanning category ", subdir)
# Insert all images into this list
img_list = []
# Iterate over all images, transform to desired size and append to img_list
for f in listdir(subdir)[:self.num_img]:
if not isfile(join(subdir, f)):
continue

img = io.imread(join(subdir, f))
img = transform.resize(img, (self.img_res_x, self.img_res_y))
img_list.append(img)

img_list = np.array(img_list)
print(img_list.shape)
assert(img_list.shape[1:] == (self.img_res_x, self.img_res_y, 3))
self.image_tensors.append(img_list)

def batch_generator(self):
"""Loads the next batch of images.

>>> batch_ge = loader.batch_generator()
>>> for batch in batch_gen():
>>> type(batch) == doggo_chunk

Returns:
chunk (doggo_chunk)
Chunk of doggo images
"""
for it, cat in zip(self.image_tensors, self.category_list):
current_chunk = doggo_chunk(it, cat)
yield current_chunk

# End of file loader_doggo.py
5 changes: 2 additions & 3 deletions delta/sources/loader_kstarecei.py
Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ def __init__(self, cfg_all):

# Process attributes stored in HDF5 file
# Extract device name of filename, the part in between .[A-Z]{2}.
m = re.search('\.[A-Z]{1,2}\.', os.path.basename(self.filename))
m = re.search("\.[A-Z]{1,2}\.", os.path.basename(self.filename))
self.attrs = {"dev": m[0][1:-1]}
self._read_attributes_from_hdf5()
self.logger = logging.getLogger('simple')
@@ -82,7 +82,7 @@ def _read_attributes_from_hdf5(self):
for attr_name in ["SampleRate", "TriggerTime", "TFcurrent", "Mode", "LoFreq",
"LensFocus", "LensZoom"]:
try:
# Test if attribute is serializable.
# Test if attribute is serializable.
# If it is f.ex an ndarray convert it to a list.
new_attr = dset[attr_name]
if isinstance(new_attr, np.ndarray):
@@ -191,4 +191,3 @@ def batch_generator(self):
yield current_chunk

# End of file loader_kstarecei.py

Loading