Skip to content

Commit

Permalink
Added unit test for coherence kernels
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Nov 13, 2020
1 parent a341635 commit 9b0bf25
Show file tree
Hide file tree
Showing 22 changed files with 391 additions and 100 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

-------------------------------------------------------------------------------
[![Documentation Status][docs-image]][docs-url]
[![Unit Tests][pytest-url]]
![Unit Tests][pytest-url]

**[Documentation](https://delta-fusion.readthedocs.io/en/latest/index.html)

Expand Down
4 changes: 4 additions & 0 deletions delta/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[run]
omit = tests/*
[pytest]
report = html
4 changes: 1 addition & 3 deletions delta/analysis/kernels_spectral.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ def kernel_coherence(fft_data, ch_it, fft_config):
Y = fft_data[ch_pair.ch2.get_idx(), :, :]
Pxx = X * X.conj()
Pyy = Y * Y.conj()
Gxy[idx, :] = ((X * Y.conj()) / np.sqrt(Pxx * Pyy)).mean(axis=1)
Gxy[idx, :] = np.abs(((X * Y.conj()) / np.sqrt(Pxx * Pyy))).mean(axis=1)

Gxy = np.abs(Gxy)
Gxy = Gxy.real

return(Gxy)


Expand Down
3 changes: 2 additions & 1 deletion delta/analysis/tasks_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ def submit(self, data_chunk):
# if tidx == 1:
# np.savez(f"test_data/fft_array_s{tidx:04d}.npz", fft_data = fft_data)

self.logger.info(f"task_list: Received type {type(data_chunk)} for chunk_idx {data_chunk.tb.chunk_idx}")
self.logger.info(f"task_list: Received type {type(data_chunk)}\
for chunk_idx {data_chunk.tb.chunk_idx}")

for task in self.task_list:
task.submit(self.executor_anl, data_chunk)
Expand Down
21 changes: 12 additions & 9 deletions delta/data_models/channels_2d.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class channel_pair:
This is just a tuple with an overloaded equality operator. It's also hashable so one can
use it in conjunction with sets.
>>> ch1 = channel(13, 7, 24, 8, 'horizontal')
>>> ch2 = channel(12, 7, 24, 8, 'horizontal')
>>> ch1 = channel_2d(13, 7, 24, 8, 'horizontal')
>>> ch2 = channel_2d(12, 7, 24, 8, 'horizontal')
>>> ch_pair = channel_pair(ch1, c2)
>>> channel_pair(ch1, ch2) == channel_pair(ch2, c1)
Expand Down Expand Up @@ -161,8 +161,8 @@ def __init__(self, ch_start, ch_end):
ch_end (:py:class:`channel_2d`): Stop channel for iteration
"""
assert(ch_start.get_num() <= ch_end.get_num())
assert(ch_start.chnum_v == ch_end.chnum_v)
assert(ch_start.chnum_h == ch_end.chnum_h)
assert(ch_start.chnum_v <= ch_end.chnum_v)
assert(ch_start.chnum_h <= ch_end.chnum_h)
assert(ch_start.order == ch_end.order)

self.ch_start = ch_start
Expand Down Expand Up @@ -221,7 +221,7 @@ def length(self):
Returns:
int: Number of channels in the range
"""
return(self.ch_end.get_num() - self.ch_start.get_num() + 1)
return((self.ch_hf - self.ch_hi + 1) * (self.ch_vf - self.ch_vi + 1))


class num_to_vh():
Expand Down Expand Up @@ -309,12 +309,15 @@ def __call__(self, ch_v, ch_h):
ch_num (int):
Linear, one-based channel number
"""
# We usually want to check that we are within the bounds.
# But sometimes it is helpful to avoid this.
# if debug:
# We usually want to check that we are within the bounds.
# But sometimes it is helpful to avoid this.
# if debug:
assert((ch_v > 0) & (ch_v < self.chnum_v + 1))
assert((ch_h > 0) & (ch_h < self.chnum_h + 1))

return((ch_v - 1) * self.chnum_h + ch_h)
if (self.order == "horizontal"):
return((ch_v - 1) * self.chnum_h + ch_h)
elif (self.order == "vertical"):
return((ch_h - 1) * self.chnum_v + ch_v)

# End of file channels_2d.py
5 changes: 2 additions & 3 deletions delta/data_models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def new_chunk(self, stream_data: np.array, chunk_idx: int):
# Generate a time-base and a data model
if self.cfg["name"] == "kstarecei":
# Adapt configuration file parameters for use in timebase_streaming constructor
tb_chunk = timebase_streaming(self.t_start, self.t_end, self.f_sample, self.chunk_size, chunk_idx)
tb_chunk = timebase_streaming(self.t_start, self.t_end, self.f_sample,
self.chunk_size, chunk_idx)
chunk = self.data_type(stream_data, tb_chunk, rarr=self.rarr, zarr=self.zarr)

# Determine whether we need to normalize the data
Expand All @@ -81,7 +82,6 @@ def new_chunk(self, stream_data: np.array, chunk_idx: int):
self.logger.info(f"Calculated normalization using\
{tidx_norm[1] - tidx_norm[0]} samples.")


if self.normalize is not None:
self.logger.info("Normalizing current_chunk")
self.normalize(chunk)
Expand Down Expand Up @@ -209,7 +209,6 @@ def __init__(self, data_norm, axis=-1):
Returns:
None
"""

self.offlev = np.median(data_norm, axis=-1, keepdims=True)
self.offstd = data_norm.std(axis=-1, keepdims=True)
self.siglev = None
Expand Down
17 changes: 8 additions & 9 deletions delta/data_models/kstar_ecei.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ def __init__(self, data, tb, rarr=None, zarr=None, num_v=24, num_h=8):
"""Creates an ecei_chunk from a give dataset.
The first dimension indices channels, the second dimension indices time.
Channels are ordered
Channels are ordered
Args:
data (ndarray, float):
Expand All @@ -96,7 +95,6 @@ def __init__(self, data, tb, rarr=None, zarr=None, num_v=24, num_h=8):
#
self.num_v, self.num_h = num_v, num_h


# We should ensure that the data is contiguous so that we can remove this from
# if not data.flags.contiguous:
self.ecei_data = np.require(data, dtype=np.float64, requirements=['C', 'O', 'W', 'A'])
Expand All @@ -111,12 +109,11 @@ def __init__(self, data, tb, rarr=None, zarr=None, num_v=24, num_h=8):
# Data can be 2 or 3 dimensional
assert(data.shape[self.axis_ch] == self.num_h * self.num_v)


# Coordinate arrays give the radial and vertical coordinate of each channel
if rarr is not None:
self.rarr = rarr
assert(rarr.size == self.num_h * self.num_v)

if zarr is not None:
assert(zarr.size == self.num_h * self.num_v)
self.zarr = zarr
Expand Down Expand Up @@ -432,7 +429,7 @@ def get_geometry(cfg_diagnostic):
print("ecei_cfg: key {0:s} not found. Defaulting to 2nd X-mode".format(k.__str__()))
cfg_diagnostic["Mode"] = 'X'
hn = 2

# To vectorize calculation of the channel positions we flatten out
# horizontal and vertical channel indices in horizontal order.
arr_ch_hv = np.zeros([24 * 8, 2], dtype=int)
Expand All @@ -444,10 +441,11 @@ def get_geometry(cfg_diagnostic):

rpos_arr = hn * e * mu0 * ttn * TFcurrent /\
(4. * np.pi * np.pi * me * ((arr_ch_hv[:, 0] - 1) * 0.9 + 2.6 + LoFreq) * 1e9)

# With radial positions at hand, continue the calculations from beam_path
# This is an (192, 2, 2) array, where the first dimension indices each individual channel
abcd_array = np.array([get_abcd(LensFocus, LensZoom, rpos, cfg_diagnostic["Device"]) for rpos in rpos_arr])
abcd_array = np.array([get_abcd(LensFocus, LensZoom, rpos,
cfg_diagnostic["Device"]) for rpos in rpos_arr])
# vertical position from the reference axis (vertical center of all lens, z=0 line)
zz = (np.arange(24, 0, -1) - 12.5) * 14 # [mm]
# angle against the reference axis at ECEI array box
Expand All @@ -456,7 +454,8 @@ def get_geometry(cfg_diagnostic):
# vertical posistion and angle at rpos
za_array = np.dot(abcd_array, [zz, aa])

zpos_arr = np.array([za_array[i, 0, v - 1] for i, v in zip(np.arange(192), arr_ch_hv[:, 1])]) * 1e-3
zpos_arr = np.array([za_array[i, 0, v - 1] for
i, v in zip(np.arange(192), arr_ch_hv[:, 1])]) * 1e-3
apos_arr = np.array([za_array[i, 1, v - 1] for i, v in zip(np.arange(192), arr_ch_hv[:, 1])])

return(rpos_arr, zpos_arr, apos_arr)
Expand Down
5 changes: 0 additions & 5 deletions delta/data_models/timebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
"""Defines time-base classes to be used in streaming settings."""


import numpy as np


class timebase_streaming():
"""Defines a timebase for a data chunk in the stream."""

Expand Down Expand Up @@ -93,6 +90,4 @@ def __str__(self):
print_str += f" local t0={t0:6.4f}s, t1={t1:6.4f}s"
return print_str



# End of file timebases.py
8 changes: 4 additions & 4 deletions delta/preprocess/plot_ecei.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def create_plot(self, chunk, tidx):
fig (mpl.Figure):
Matplotlib figure
"""
#self.set_contour_levels(chunk)
# self.set_contour_levels(chunk)

frame_vals = chunk.data[:, tidx]

Expand All @@ -158,12 +158,12 @@ def create_plot(self, chunk, tidx):
if self.rpos_arr is not None and self.zpos_arr is not None:
self.logger.info(f"Plotting data: {frame_vals.reshape(24, 8)}")
# TODO: Fix hard-coded dimensions
mappable = ax.contourf(self.rpos_arr.reshape(24, 8),
self.zpos_arr.reshape(24, 8),
mappable = ax.contourf(self.rpos_arr.reshape(24, 8),
self.zpos_arr.reshape(24, 8),
frame_vals.reshape(24, 8), levels=self.clevs)
ax.set_xlabel("R / m")
ax.set_ylabel("Z / m")
#else:
# else:
# mappable = ax.contourf(self.rpos_arr, self.zpos_arr, frame_vals)#, levels=self.clevs)
fig.colorbar(mappable, cax=ax_cb)

Expand Down
2 changes: 0 additions & 2 deletions delta/preprocess/pre_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def process(self, data_chunk, executor):
data_chunk (2d_image):
Wavelet-filtered images
"""

tidx_plot = [data_chunk.tb.time_to_idx(t) for t in self.time_range]

if tidx_plot[0] is not None:
Expand All @@ -56,7 +55,6 @@ def process(self, data_chunk, executor):
fig = self.plotter.create_plot(data_chunk, tidx)
fig.savefig(join(self.plot_dir, f"chunk_{data_chunk.tb.chunk_idx}_{tidx:04d}.png"))


return data_chunk


Expand Down
2 changes: 1 addition & 1 deletion delta/preprocess/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, executor, cfg):
Args:
executor (PEP-3148-style executor):
Executor on which all pre-processing will be performed
cfg_preprocess
cfg_preprocess
Delta configuration
Returns:
Expand Down
2 changes: 1 addition & 1 deletion delta/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
To run on an interactive node
srun -n 4 python -m mpi4py.futures processor_mpi_tasklist.py --config configs/test_all.json
Remember to have adios2 included in $PYTHONPATH
Remember to have adios2 included in $PYTHONPATH
"""

from mpi4py import MPI
Expand Down
49 changes: 28 additions & 21 deletions delta/sources/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self, cfg_all, cache=True):
self.t_start = cfg_all["diagnostic"]["parameters"]["TriggerTime"][0]
self.t_end = min(cfg_all["diagnostic"]["parameters"]["TriggerTime"][1],
self.t_start + 5_000_000 * self.dt)
self.dev = cfg_all["diagnostic"]["parameters"]["Device"]

self.logger = logging.getLogger('simple')

Expand All @@ -76,6 +77,29 @@ def __init__(self, cfg_all, cache=True):
self.cache()
self.is_cached = True

def _read_from_hdf5(self, array, idx_start, idx_end):
"""Reads data from HDF5 into array.
Values in array are changed in-place.
Args:
array (np.ndarray):
Array where we store HDF5 data
idx_start (int):
First index to read
idx_end (int):
Last index to read
Returns:
None
"""
# Cache the data in memory
with h5py.File(self.filename, "r",) as df:
for ch in self.ch_range:
chname_h5 = f"/ECEI/ECEI_{self.dev}{ch.ch_v:02d}{ch.ch_h:02d}/Voltage"
array[ch.get_idx(), :] = df[chname_h5][idx_start:idx_end].astype(self.dtype)
array[:] = array[:] * 1e-4

def cache(self):
"""Loads data from HDF5 and fills the cache.
Expand All @@ -84,16 +108,10 @@ def cache(self):
"""
self.cache = np.zeros([self.ch_range.length(), self.chunk_size * self.num_chunks],
dtype=self.dtype)

assert(self.cache.flags.contiguous)

# Cache the data in memory
with h5py.File(self.filename, "r",) as df:
for ch in self.ch_range:
chname_h5 = f"/ECEI/ECEI_L{ch.ch_v:02d}{ch.ch_h:02d}/Voltage"
self.cache[ch.get_idx(), :] =\
df[chname_h5][:self.chunk_size * self.num_chunks].astype(self.dtype)
self.cache = self.cache * 1e-4
# Load contents of entire HDF5 file into self.cache
self._read_from_hdf5(self.cache, 0, self.chunk_size * self.num_chunks)

def get_chunk_shape(self):
"""Returns the size of chunks.
Expand All @@ -107,10 +125,6 @@ def get_chunk_shape(self):
"""
return (self.ch_range.length(), self.chunk_size)

def get_chunk_size_bytes(self):
"""Returns the size of a chunk, in bytes."""
pass

def batch_generator(self):
"""Loads the next time-chunk from the data file.
Expand Down Expand Up @@ -147,17 +161,10 @@ def batch_generator(self):

# If we haven't cached, load from HDF5
else:
with h5py.File(self.filename, "r",) as df:
for ch in self.ch_range:
chname_h5 = f"/ECEI/ECEI_{ch}/Voltage"
_chunk_data[ch.get_idx(), :] = df[chname_h5][current_chunk *
self.chunk_size:
(current_chunk + 1) *
self.chunk_size]
_chunk_data = _chunk_data * 1e-4
self._read_from_hdf5(_chunk_data, current_chunk * self.chunk_size,
(current_chunk + 1) * self.chunk_size)

current_chunk = ecei_chunk(_chunk_data, tb_chunk)

yield current_chunk


Expand Down
2 changes: 1 addition & 1 deletion delta/storage/backend_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import json


from storage.helpers import serialize_dispatch_seq
from storage.helpers import serialize_dispatch_seq


class backend_numpy():
Expand Down
2 changes: 1 addition & 1 deletion delta/storage/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ def deserialize_dispatch_seq(channel_ser):

return dispatch_seq

# End of file storage/helpers.py
# End of file storage/helpers.py
Loading

0 comments on commit 9b0bf25

Please sign in to comment.