From 9b0bf258e069cbc42241fa3895182adb92d57894 Mon Sep 17 00:00:00 2001 From: Ralph Kube Date: Fri, 13 Nov 2020 03:34:44 -0800 Subject: [PATCH] Added unit test for coherence kernels --- README.md | 2 +- delta/.coveragerc | 4 ++ delta/analysis/kernels_spectral.py | 4 +- delta/analysis/tasks_mpi.py | 3 +- delta/data_models/channels_2d.py | 21 ++++--- delta/data_models/helpers.py | 5 +- delta/data_models/kstar_ecei.py | 17 +++--- delta/data_models/timebase.py | 5 -- delta/preprocess/plot_ecei.py | 8 +-- delta/preprocess/pre_plot.py | 2 - delta/preprocess/preprocess.py | 2 +- delta/processor.py | 2 +- delta/sources/dataloader.py | 49 ++++++++------- delta/storage/backend_numpy.py | 2 +- delta/storage/helpers.py | 2 +- delta/streaming/stream_stats.py | 24 ++++---- requirements.txt | 12 ++-- tests/conftest.py | 96 ++++++++++++++++++++++++++++++ tests/test_channels_2d.py | 71 ++++++++++++++++++++++ tests/test_dataloader_kstar.py | 58 ++++++++++++++++++ tests/test_kernel_crossphase.py | 79 ++++++++++++++++++++++++ tests/test_kstar_geometry.py | 23 ++----- 22 files changed, 391 insertions(+), 100 deletions(-) create mode 100644 delta/.coveragerc create mode 100644 tests/conftest.py create mode 100644 tests/test_channels_2d.py create mode 100644 tests/test_dataloader_kstar.py create mode 100644 tests/test_kernel_crossphase.py diff --git a/README.md b/README.md index a84cde7..d292f85 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ ------------------------------------------------------------------------------- [![Documentation Status][docs-image]][docs-url] -[![Unit Tests][pytest-url]] +![Unit Tests][pytest-url] **[Documentation](https://delta-fusion.readthedocs.io/en/latest/index.html) diff --git a/delta/.coveragerc b/delta/.coveragerc new file mode 100644 index 0000000..919ac25 --- /dev/null +++ b/delta/.coveragerc @@ -0,0 +1,4 @@ +[run] +omit = tests/* +[pytest] +report = html \ No newline at end of file diff --git a/delta/analysis/kernels_spectral.py b/delta/analysis/kernels_spectral.py index 1f4663f..1a8688d 100644 --- a/delta/analysis/kernels_spectral.py +++ b/delta/analysis/kernels_spectral.py @@ -82,11 +82,9 @@ def kernel_coherence(fft_data, ch_it, fft_config): Y = fft_data[ch_pair.ch2.get_idx(), :, :] Pxx = X * X.conj() Pyy = Y * Y.conj() - Gxy[idx, :] = ((X * Y.conj()) / np.sqrt(Pxx * Pyy)).mean(axis=1) + Gxy[idx, :] = np.abs(((X * Y.conj()) / np.sqrt(Pxx * Pyy))).mean(axis=1) - Gxy = np.abs(Gxy) Gxy = Gxy.real - return(Gxy) diff --git a/delta/analysis/tasks_mpi.py b/delta/analysis/tasks_mpi.py index 86952fc..3f9dde3 100644 --- a/delta/analysis/tasks_mpi.py +++ b/delta/analysis/tasks_mpi.py @@ -300,7 +300,8 @@ def submit(self, data_chunk): # if tidx == 1: # np.savez(f"test_data/fft_array_s{tidx:04d}.npz", fft_data = fft_data) - self.logger.info(f"task_list: Received type {type(data_chunk)} for chunk_idx {data_chunk.tb.chunk_idx}") + self.logger.info(f"task_list: Received type {type(data_chunk)}\ + for chunk_idx {data_chunk.tb.chunk_idx}") for task in self.task_list: task.submit(self.executor_anl, data_chunk) diff --git a/delta/data_models/channels_2d.py b/delta/data_models/channels_2d.py index f73b58d..8b1dfa9 100644 --- a/delta/data_models/channels_2d.py +++ b/delta/data_models/channels_2d.py @@ -66,8 +66,8 @@ class channel_pair: This is just a tuple with an overloaded equality operator. It's also hashable so one can use it in conjunction with sets. - >>> ch1 = channel(13, 7, 24, 8, 'horizontal') - >>> ch2 = channel(12, 7, 24, 8, 'horizontal') + >>> ch1 = channel_2d(13, 7, 24, 8, 'horizontal') + >>> ch2 = channel_2d(12, 7, 24, 8, 'horizontal') >>> ch_pair = channel_pair(ch1, c2) >>> channel_pair(ch1, ch2) == channel_pair(ch2, c1) @@ -161,8 +161,8 @@ def __init__(self, ch_start, ch_end): ch_end (:py:class:`channel_2d`): Stop channel for iteration """ assert(ch_start.get_num() <= ch_end.get_num()) - assert(ch_start.chnum_v == ch_end.chnum_v) - assert(ch_start.chnum_h == ch_end.chnum_h) + assert(ch_start.chnum_v <= ch_end.chnum_v) + assert(ch_start.chnum_h <= ch_end.chnum_h) assert(ch_start.order == ch_end.order) self.ch_start = ch_start @@ -221,7 +221,7 @@ def length(self): Returns: int: Number of channels in the range """ - return(self.ch_end.get_num() - self.ch_start.get_num() + 1) + return((self.ch_hf - self.ch_hi + 1) * (self.ch_vf - self.ch_vi + 1)) class num_to_vh(): @@ -309,12 +309,15 @@ def __call__(self, ch_v, ch_h): ch_num (int): Linear, one-based channel number """ - # We usually want to check that we are within the bounds. - # But sometimes it is helpful to avoid this. - # if debug: + # We usually want to check that we are within the bounds. + # But sometimes it is helpful to avoid this. + # if debug: assert((ch_v > 0) & (ch_v < self.chnum_v + 1)) assert((ch_h > 0) & (ch_h < self.chnum_h + 1)) - return((ch_v - 1) * self.chnum_h + ch_h) + if (self.order == "horizontal"): + return((ch_v - 1) * self.chnum_h + ch_h) + elif (self.order == "vertical"): + return((ch_h - 1) * self.chnum_v + ch_v) # End of file channels_2d.py diff --git a/delta/data_models/helpers.py b/delta/data_models/helpers.py index f5f4512..d8c83cd 100644 --- a/delta/data_models/helpers.py +++ b/delta/data_models/helpers.py @@ -65,7 +65,8 @@ def new_chunk(self, stream_data: np.array, chunk_idx: int): # Generate a time-base and a data model if self.cfg["name"] == "kstarecei": # Adapt configuration file parameters for use in timebase_streaming constructor - tb_chunk = timebase_streaming(self.t_start, self.t_end, self.f_sample, self.chunk_size, chunk_idx) + tb_chunk = timebase_streaming(self.t_start, self.t_end, self.f_sample, + self.chunk_size, chunk_idx) chunk = self.data_type(stream_data, tb_chunk, rarr=self.rarr, zarr=self.zarr) # Determine whether we need to normalize the data @@ -81,7 +82,6 @@ def new_chunk(self, stream_data: np.array, chunk_idx: int): self.logger.info(f"Calculated normalization using\ {tidx_norm[1] - tidx_norm[0]} samples.") - if self.normalize is not None: self.logger.info("Normalizing current_chunk") self.normalize(chunk) @@ -209,7 +209,6 @@ def __init__(self, data_norm, axis=-1): Returns: None """ - self.offlev = np.median(data_norm, axis=-1, keepdims=True) self.offstd = data_norm.std(axis=-1, keepdims=True) self.siglev = None diff --git a/delta/data_models/kstar_ecei.py b/delta/data_models/kstar_ecei.py index 2bb1e46..ca9e26e 100644 --- a/delta/data_models/kstar_ecei.py +++ b/delta/data_models/kstar_ecei.py @@ -70,8 +70,7 @@ def __init__(self, data, tb, rarr=None, zarr=None, num_v=24, num_h=8): """Creates an ecei_chunk from a give dataset. The first dimension indices channels, the second dimension indices time. - Channels are ordered - + Channels are ordered Args: data (ndarray, float): @@ -96,7 +95,6 @@ def __init__(self, data, tb, rarr=None, zarr=None, num_v=24, num_h=8): # self.num_v, self.num_h = num_v, num_h - # We should ensure that the data is contiguous so that we can remove this from # if not data.flags.contiguous: self.ecei_data = np.require(data, dtype=np.float64, requirements=['C', 'O', 'W', 'A']) @@ -111,12 +109,11 @@ def __init__(self, data, tb, rarr=None, zarr=None, num_v=24, num_h=8): # Data can be 2 or 3 dimensional assert(data.shape[self.axis_ch] == self.num_h * self.num_v) - # Coordinate arrays give the radial and vertical coordinate of each channel if rarr is not None: self.rarr = rarr assert(rarr.size == self.num_h * self.num_v) - + if zarr is not None: assert(zarr.size == self.num_h * self.num_v) self.zarr = zarr @@ -432,7 +429,7 @@ def get_geometry(cfg_diagnostic): print("ecei_cfg: key {0:s} not found. Defaulting to 2nd X-mode".format(k.__str__())) cfg_diagnostic["Mode"] = 'X' hn = 2 - + # To vectorize calculation of the channel positions we flatten out # horizontal and vertical channel indices in horizontal order. arr_ch_hv = np.zeros([24 * 8, 2], dtype=int) @@ -444,10 +441,11 @@ def get_geometry(cfg_diagnostic): rpos_arr = hn * e * mu0 * ttn * TFcurrent /\ (4. * np.pi * np.pi * me * ((arr_ch_hv[:, 0] - 1) * 0.9 + 2.6 + LoFreq) * 1e9) - + # With radial positions at hand, continue the calculations from beam_path # This is an (192, 2, 2) array, where the first dimension indices each individual channel - abcd_array = np.array([get_abcd(LensFocus, LensZoom, rpos, cfg_diagnostic["Device"]) for rpos in rpos_arr]) + abcd_array = np.array([get_abcd(LensFocus, LensZoom, rpos, + cfg_diagnostic["Device"]) for rpos in rpos_arr]) # vertical position from the reference axis (vertical center of all lens, z=0 line) zz = (np.arange(24, 0, -1) - 12.5) * 14 # [mm] # angle against the reference axis at ECEI array box @@ -456,7 +454,8 @@ def get_geometry(cfg_diagnostic): # vertical posistion and angle at rpos za_array = np.dot(abcd_array, [zz, aa]) - zpos_arr = np.array([za_array[i, 0, v - 1] for i, v in zip(np.arange(192), arr_ch_hv[:, 1])]) * 1e-3 + zpos_arr = np.array([za_array[i, 0, v - 1] for + i, v in zip(np.arange(192), arr_ch_hv[:, 1])]) * 1e-3 apos_arr = np.array([za_array[i, 1, v - 1] for i, v in zip(np.arange(192), arr_ch_hv[:, 1])]) return(rpos_arr, zpos_arr, apos_arr) diff --git a/delta/data_models/timebase.py b/delta/data_models/timebase.py index 6c16d6a..aefa878 100644 --- a/delta/data_models/timebase.py +++ b/delta/data_models/timebase.py @@ -3,9 +3,6 @@ """Defines time-base classes to be used in streaming settings.""" -import numpy as np - - class timebase_streaming(): """Defines a timebase for a data chunk in the stream.""" @@ -93,6 +90,4 @@ def __str__(self): print_str += f" local t0={t0:6.4f}s, t1={t1:6.4f}s" return print_str - - # End of file timebases.py diff --git a/delta/preprocess/plot_ecei.py b/delta/preprocess/plot_ecei.py index 515d6bf..b3c50b6 100644 --- a/delta/preprocess/plot_ecei.py +++ b/delta/preprocess/plot_ecei.py @@ -144,7 +144,7 @@ def create_plot(self, chunk, tidx): fig (mpl.Figure): Matplotlib figure """ - #self.set_contour_levels(chunk) + # self.set_contour_levels(chunk) frame_vals = chunk.data[:, tidx] @@ -158,12 +158,12 @@ def create_plot(self, chunk, tidx): if self.rpos_arr is not None and self.zpos_arr is not None: self.logger.info(f"Plotting data: {frame_vals.reshape(24, 8)}") # TODO: Fix hard-coded dimensions - mappable = ax.contourf(self.rpos_arr.reshape(24, 8), - self.zpos_arr.reshape(24, 8), + mappable = ax.contourf(self.rpos_arr.reshape(24, 8), + self.zpos_arr.reshape(24, 8), frame_vals.reshape(24, 8), levels=self.clevs) ax.set_xlabel("R / m") ax.set_ylabel("Z / m") - #else: + # else: # mappable = ax.contourf(self.rpos_arr, self.zpos_arr, frame_vals)#, levels=self.clevs) fig.colorbar(mappable, cax=ax_cb) diff --git a/delta/preprocess/pre_plot.py b/delta/preprocess/pre_plot.py index 666a07a..3b73ce0 100644 --- a/delta/preprocess/pre_plot.py +++ b/delta/preprocess/pre_plot.py @@ -44,7 +44,6 @@ def process(self, data_chunk, executor): data_chunk (2d_image): Wavelet-filtered images """ - tidx_plot = [data_chunk.tb.time_to_idx(t) for t in self.time_range] if tidx_plot[0] is not None: @@ -56,7 +55,6 @@ def process(self, data_chunk, executor): fig = self.plotter.create_plot(data_chunk, tidx) fig.savefig(join(self.plot_dir, f"chunk_{data_chunk.tb.chunk_idx}_{tidx:04d}.png")) - return data_chunk diff --git a/delta/preprocess/preprocess.py b/delta/preprocess/preprocess.py index 0859177..76b642d 100644 --- a/delta/preprocess/preprocess.py +++ b/delta/preprocess/preprocess.py @@ -24,7 +24,7 @@ def __init__(self, executor, cfg): Args: executor (PEP-3148-style executor): Executor on which all pre-processing will be performed - cfg_preprocess + cfg_preprocess Delta configuration Returns: diff --git a/delta/processor.py b/delta/processor.py index 7eb5c34..d897ef8 100644 --- a/delta/processor.py +++ b/delta/processor.py @@ -5,7 +5,7 @@ To run on an interactive node srun -n 4 python -m mpi4py.futures processor_mpi_tasklist.py --config configs/test_all.json -Remember to have adios2 included in $PYTHONPATH +Remember to have adios2 included in $PYTHONPATH """ from mpi4py import MPI diff --git a/delta/sources/dataloader.py b/delta/sources/dataloader.py index df5ac6e..326f16c 100644 --- a/delta/sources/dataloader.py +++ b/delta/sources/dataloader.py @@ -67,6 +67,7 @@ def __init__(self, cfg_all, cache=True): self.t_start = cfg_all["diagnostic"]["parameters"]["TriggerTime"][0] self.t_end = min(cfg_all["diagnostic"]["parameters"]["TriggerTime"][1], self.t_start + 5_000_000 * self.dt) + self.dev = cfg_all["diagnostic"]["parameters"]["Device"] self.logger = logging.getLogger('simple') @@ -76,6 +77,29 @@ def __init__(self, cfg_all, cache=True): self.cache() self.is_cached = True + def _read_from_hdf5(self, array, idx_start, idx_end): + """Reads data from HDF5 into array. + + Values in array are changed in-place. + + Args: + array (np.ndarray): + Array where we store HDF5 data + idx_start (int): + First index to read + idx_end (int): + Last index to read + + Returns: + None + """ + # Cache the data in memory + with h5py.File(self.filename, "r",) as df: + for ch in self.ch_range: + chname_h5 = f"/ECEI/ECEI_{self.dev}{ch.ch_v:02d}{ch.ch_h:02d}/Voltage" + array[ch.get_idx(), :] = df[chname_h5][idx_start:idx_end].astype(self.dtype) + array[:] = array[:] * 1e-4 + def cache(self): """Loads data from HDF5 and fills the cache. @@ -84,16 +108,10 @@ def cache(self): """ self.cache = np.zeros([self.ch_range.length(), self.chunk_size * self.num_chunks], dtype=self.dtype) - assert(self.cache.flags.contiguous) - # Cache the data in memory - with h5py.File(self.filename, "r",) as df: - for ch in self.ch_range: - chname_h5 = f"/ECEI/ECEI_L{ch.ch_v:02d}{ch.ch_h:02d}/Voltage" - self.cache[ch.get_idx(), :] =\ - df[chname_h5][:self.chunk_size * self.num_chunks].astype(self.dtype) - self.cache = self.cache * 1e-4 + # Load contents of entire HDF5 file into self.cache + self._read_from_hdf5(self.cache, 0, self.chunk_size * self.num_chunks) def get_chunk_shape(self): """Returns the size of chunks. @@ -107,10 +125,6 @@ def get_chunk_shape(self): """ return (self.ch_range.length(), self.chunk_size) - def get_chunk_size_bytes(self): - """Returns the size of a chunk, in bytes.""" - pass - def batch_generator(self): """Loads the next time-chunk from the data file. @@ -147,17 +161,10 @@ def batch_generator(self): # If we haven't cached, load from HDF5 else: - with h5py.File(self.filename, "r",) as df: - for ch in self.ch_range: - chname_h5 = f"/ECEI/ECEI_{ch}/Voltage" - _chunk_data[ch.get_idx(), :] = df[chname_h5][current_chunk * - self.chunk_size: - (current_chunk + 1) * - self.chunk_size] - _chunk_data = _chunk_data * 1e-4 + self._read_from_hdf5(_chunk_data, current_chunk * self.chunk_size, + (current_chunk + 1) * self.chunk_size) current_chunk = ecei_chunk(_chunk_data, tb_chunk) - yield current_chunk diff --git a/delta/storage/backend_numpy.py b/delta/storage/backend_numpy.py index 34998e5..21e142c 100644 --- a/delta/storage/backend_numpy.py +++ b/delta/storage/backend_numpy.py @@ -9,7 +9,7 @@ import json -from storage.helpers import serialize_dispatch_seq +from storage.helpers import serialize_dispatch_seq class backend_numpy(): diff --git a/delta/storage/helpers.py b/delta/storage/helpers.py index 5afd0c3..5bef281 100644 --- a/delta/storage/helpers.py +++ b/delta/storage/helpers.py @@ -100,4 +100,4 @@ def deserialize_dispatch_seq(channel_ser): return dispatch_seq -# End of file storage/helpers.py \ No newline at end of file +# End of file storage/helpers.py diff --git a/delta/streaming/stream_stats.py b/delta/streaming/stream_stats.py index b86e56d..6b75e00 100644 --- a/delta/streaming/stream_stats.py +++ b/delta/streaming/stream_stats.py @@ -3,11 +3,10 @@ import numpy as np class stream_stats(): - """Collects statistics over data transfer timings""" + """Collects statistics for data transfer timings.""" def __init__(self): - """Initializes""" - + """Initialize the object.""" # List that stores the size of transferred packets in bytes self.packet_sizes = [] # List that stores the time spent in send calls, in seconds @@ -16,30 +15,29 @@ def __init__(self): self.nsteps = 0 def add_transfer(self, num_bytes, duration): - """Adds a new transfer + """Adds a new transfer. - Parameters: - ----------- - num_bytes: int - duration: float + Args: + num_bytes (int): + Number of bytes that have been transferred + duration (float): + Duration for the transfer Returns: - -------- - None + None """ self.packet_sizes.append(num_bytes) self.durations.append(duration) self.nsteps += 1 - def get_transfer_stats(self): - """Return max, min, avg, std of packet sizes""" + """Return max, min, avg, std of packet sizes.""" arr = np.array(self.packet_sizes) return (arr.sum(), arr.max(), arr.min(), arr.mean(), arr.std()) def get_duration_stats(self): - """Return max, min, avg, std of transfer durations""" + """Return max, min, avg, std of transfer durations.""" arr = np.array(self.durations) return (arr.sum(), arr.max(), arr.min(), arr.mean(), arr.std()) diff --git a/requirements.txt b/requirements.txt index bc3afc1..841ca9c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,14 +2,14 @@ numpy>=1.17 scipy>=1.3.2 more-itertools>=8.1.0 pyyaml>=5.1.1 -scikit-image -cython -sphinx-rtd-theme +scikit-image>=0.17.2 +cython>=0.29.21 +sphinx-rtd-theme>=0.5.0 pre-commit nox pymongo>=3.9.0 flake8>=3.8.3 flake8-docstrings>=1.5.0 -pre-commit -pytest -pytest-cov +pytest>=6.1.2 +pytest-cov>=2.10.1 +mock>=4.0.2 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..303b212 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,96 @@ +# -*- Encoding: UTF-8 -*- + +import pytest +import json + + +@pytest.fixture(scope="module") +def config_all(): + """Provides a configuration object for all unit tests.""" + config_str = """{ + "diagnostic": + { + "name": "kstarecei", + "shotnr": 18431, + "datasource": + { + "source_file": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/ECEI.018431.LFS.h5", + "chunk_size": 10000, + "num_chunks": 500, + "channel_range": ["L0101-2408"], + "datatype": "float" + }, + "parameters": { + "Device": "L", + "TriggerTime": [-0.12, 61.2, 60], + "t_norm": [-0.119, -0.109], + "SampleRate": 500, + "TFcurrent": 23000.0, + "Mode": "O", + "LoFreq": 81, + "LensFocus": 80, + "LensZoom": 340} + }, + "transport_nersc": + { + "datapath": "tmp_nersc", + "engine": "BP4", + "params": + { + "IPAddress": "128.55.205.18", + "Timeout": "120", + "Port": "50001", + "TransportMode": "fast" + } + }, + "storage_mongo": + { + "backend": "mongo", + "datastore": "gridfs", + "datadir": "/global/cscratch1/sd/rkube/delta" + }, + "storage": + { + "backend": "null" + }, + "preprocess": { + "plot": {"time_range": [0.0, 0.00002], "plot_dir": "/global/homes/r/rkube/delta_runs/plots/"}, + "wavelet": {"wavelet": "db5", "method": "BayesShrink", "wavelet_levels": 5, "rescale_sigma": false}, + "stft": {"nfft": 512, "fs": 500000, "window": "hann", "overlap": 0.5, "noverlap": 256, "detrend": "constant", "full": true}, + "no_bandpass_fir": {"N": 5, "Wn": [0.02, 0.036], "btype": "bandpass", "output": "sos"} + }, + "task_list": [{ + "task_description" : "cross_correlation", + "analysis": "cross_correlation", + "channel_chunk_size": 32768, + "ref_channels" : [1, 1, 24, 8], + "cmp_channels" : [1, 1, 24, 8] + }, + { + "task_description" : "cross_power", + "analysis": "cross_power", + "channel_chunk_size": 32768, + "ref_channels" : [1, 1, 24, 8], + "cmp_channels" : [1, 1, 24, 8] + }, + { + "task_description" : "cross_phase", + "analysis": "cross_phase", + "channel_chunk_size": 32768, + "ref_channels" : [1, 1, 24, 8], + "cmp_channels" : [1, 1, 24, 8] + }, + { + "task_description" : "coherence", + "analysis": "coherence", + "channel_chunk_size": 32768, + "ref_channels" : [1, 1, 24, 8], + "cmp_channels" : [1, 1, 24, 8] + }] + }""" + + config = json.loads(config_str) + return config + + +# End of file conftest.py diff --git a/tests/test_channels_2d.py b/tests/test_channels_2d.py new file mode 100644 index 0000000..7e245c5 --- /dev/null +++ b/tests/test_channels_2d.py @@ -0,0 +1,71 @@ +# -*- Encoding: UTF-8 -*- + +"""Unit tests for channels_2d.py.""" + +def test_channel_2d(config_all): + """Tests channel_2d object.""" + import sys + import os + sys.path.append(os.path.abspath('delta')) + from delta.data_models.channels_2d import channel_2d + + ch_v = 6 + ch_h = 3 + ch = channel_2d(ch_v, ch_h, 24, 8, "horizontal") + + assert(ch.__str__() == f"({ch_v:03d}, {ch_h:03d})") + assert(ch == channel_2d(ch_v, ch_h, 24, 8, "horizontal")) + assert(ch.get_num() == (ch_v - 1) * 8 + ch_h) + assert(ch.get_idx() == (ch_v - 1) * 8 + ch_h - 1) + + +def test_channel_range_h(config_all): + """Tests channel_range object.""" + import sys + import os + sys.path.append(os.path.abspath('delta')) + from delta.data_models.channels_2d import channel_2d, channel_range + + ch_start = channel_2d(2, 3, 6, 6, "horizontal") + ch_end = channel_2d(5, 5, 6, 6, "horizontal") + + ch_rg = channel_range(ch_start, ch_end) + assert(ch_rg.length() == 12) + i = 0 + for _ in ch_rg: + i += 1 + assert(i == ch_rg.length()) + + +def test_channel_range_v(config_all): + """Tests channel_range object.""" + import sys + import os + sys.path.append(os.path.abspath('delta')) + from delta.data_models.channels_2d import channel_2d, channel_range + + ch_start = channel_2d(2, 3, 6, 6, "vertical") + ch_end = channel_2d(5, 5, 6, 6, "vertical") + + ch_rg = channel_range(ch_start, ch_end) + assert(ch_rg.length() == 12) + i = 0 + for _ in ch_rg: + i += 1 + assert(i == ch_rg.length()) + + +def test_channel_pair(config_all): + """Tests member functions of channel_pair.""" + import sys + import os + sys.path.append(os.path.abspath('delta')) + from delta.data_models.channels_2d import channel_2d, channel_pair + + ch1 = channel_2d(13, 7, 24, 8, 'horizontal') + ch2 = channel_2d(12, 7, 24, 8, 'horizontal') + + assert(channel_pair(ch1, ch2) == channel_pair(ch2, ch1)) + + +# End of file test_channels_2d.py diff --git a/tests/test_dataloader_kstar.py b/tests/test_dataloader_kstar.py new file mode 100644 index 0000000..8b5667b --- /dev/null +++ b/tests/test_dataloader_kstar.py @@ -0,0 +1,58 @@ +# -*- Encoding: UTF-8 -*- + +"""Verify that dataloaders works correctly.""" + +import mock +import numpy as np + + +def load_dummy_data(cls, array, idx_start, idx_end): + """Replaces _read_from_hdf5 in dataloader.""" + num_channels = array.shape[0] + + dummy_data = np.random.uniform(3.0, 4.0, [num_channels, idx_end - idx_start]) + array[:] = dummy_data[:] + + +def test_dataloader_ecei_cached(config_all): + """Verify that _dataloader_ecei works correctly when using cached data.""" + import sys + import os + sys.path.append(os.path.abspath('delta')) + import numpy as np + # # Import packages as delta.... so that we can run pytest as + from delta.sources.dataloader import _loader_ecei + + cfg_all = config_all + cfg_all["diagnostic"]["datasource"]["num_chunks"] = 5 + # Instantiate a loader where _read_from_hdf5 is replaced with load_dummy_data + with mock.patch.object(_loader_ecei, "_read_from_hdf5", new=load_dummy_data): + my_loader = _loader_ecei(cfg_all, cache=True) + + for batch in my_loader.batch_generator(): + # Mean should be roughly 3.5, depending on what use as dummy data + assert(np.abs(np.mean(batch.data) - 3.5) < 1e-2) + + +def test_dataloader_ecei_nocache(config_all): + """Verify that _dataloader_ecei works correctly when using cached data.""" + import sys + import os + sys.path.append(os.path.abspath('delta')) + import numpy as np + # # Import packages as delta.... so that we can run pytest as + from delta.sources.dataloader import _loader_ecei + + cfg_all = config_all + # Instantiate a loader where _read_from_hdf5 is replaced with load_dummy_data + with mock.patch.object(_loader_ecei, "_read_from_hdf5", new=load_dummy_data): + my_loader = _loader_ecei(cfg_all, cache=False) + + assert(my_loader.get_chunk_shape() == (192, cfg_all["diagnostic"]["datasource"]["chunk_size"])) + for batch in my_loader.batch_generator(): + # Mean should be roughly 3.5, depending on what use as dummy data + assert(np.abs(np.mean(batch.data) - 3.5) < 1e-2) + + + +# End of file test_dataloader_kstar.py diff --git a/tests/test_kernel_crossphase.py b/tests/test_kernel_crossphase.py new file mode 100644 index 0000000..e795d2c --- /dev/null +++ b/tests/test_kernel_crossphase.py @@ -0,0 +1,79 @@ +# -*- Encoding: UTF-8 -*- + +import pytest + +"""Test cross-correlation kernel.""" + + +@pytest.fixture +def gen_sine_waves(): + """Generate sine wave data for coherence kernel. + + Creates two signals with two frequencies. + Each frequency has a distinct phase shift. + + See kstar_test_coherence.ipynb + """ + import numpy as np + from scipy.signal import stft + + # Number of realizations of the signal + num_realizations = 5 + # Sampels per realization + samples_per_realization = 100 + t0 = 0.0 + t1 = 1.0 + dt = 1e-2 + # Time range of a single realization + trg = np.arange(t0, t1 * num_realizations, dt) + + # Pre-allocate wave data array + wave_data = np.zeros([2, num_realizations * samples_per_realization]) + + # Base frequencies and phase shift of each frequency + f0 = 1.0 + f1 = 4.0 + delta_phi_f0 = 0.25 * (t1 - t0) + delta_phi_f1 = 0.5 * (t1 - t0) + + # Calculate y + wave_data[0, :] = np.sin(f0 * 2.0 * np.pi * trg) + np.sin(f1 * 2.0 * np.pi * trg) + wave_data[1, :] = np.sin(f0 * 2.0 * np.pi * (trg - delta_phi_f0)) + np.sin(f1 * 2.0 * np.pi * (trg - delta_phi_f1)) + + # Pre-allocate FFT data array. + num_bins = 11 + nfft = samples_per_realization // 2 + 1 + fft_data = np.zeros([2, nfft, num_bins], dtype=np.complex128) + for ch in [0, 1]: + f_s = stft(wave_data[ch, :], nperseg=100) + fft_data[ch, :, :] = f_s[2][:, ] + + return fft_data + + +def test_kernel_crosscorr(caplog, config_all, gen_sine_waves): + """Test cross-correlation.""" + import sys + import os + sys.path.append(os.path.abspath('delta')) + import numpy as np + from delta.analysis.kernels_spectral import kernel_coherence + from delta.data_models.channels_2d import channel_2d, channel_pair + + import logging + logger = logging.getLogger(__name__) + caplog.set_level(logging.INFO) + + ch1 = channel_2d(1, 1, 2, 1, "horizontal") + ch2 = channel_2d(2, 1, 2, 1, "horizontal") + ch_pair = channel_pair(ch1, ch2) + + logger.info(f"{ch1.get_idx()} {ch2.get_idx()}") + + fft_data = gen_sine_waves + coherence = kernel_coherence(fft_data, [ch_pair], None) + + assert((np.abs(coherence.mean()) - 1.0) < 1e-8) + + +# End of file test_kernel_coherence.py diff --git a/tests/test_kstar_geometry.py b/tests/test_kstar_geometry.py index b39649b..7135fcb 100644 --- a/tests/test_kstar_geometry.py +++ b/tests/test_kstar_geometry.py @@ -210,7 +210,7 @@ def get_abcd(self, sf, sz, Rinit): return abcd -def test_ecei_channel_geom(): +def test_ecei_channel_geom(config_all): """Verify calculated ECEI channel positions are the same as in fluctana.""" import sys import os @@ -226,24 +226,9 @@ def test_ecei_channel_geom(): # ch_str = f"ECEI_L{ch_v:02d}{ch_h:02d}" ch_2d = channel_2d(ch_v, ch_h, 24, 8, order="horizontal") - # Manually provide ECEI configuration dictionary for Delta - cfg_diagnostic = {"name": "kstarecei", "shotnr": 18431, - "datasource": { - "source_file": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/ECEI.018431.LFS.h5", - "chunk_size": 10000, - "num_chunks": 500, - "channel_range": ["L0101-2408"], - "datatype": "float"}, - "parameters": { - "Device": "L", - "TriggerTime": [-0.12, 61.2, 60], - "t_norm": [-0.119, -0.109], - "SampleRate": 500, - "TFcurrent": 23000.0, - "Mode": "O", - "LoFreq": 81, - "LensFocus": 80, - "LensZoom": 340}} + # Use the config_all fixture to get Delta configuration + config = config_all + cfg_diagnostic = config["diagnostic"] # Calcuate channel position using FluctAna and Delta K = kstarecei_mockup(ch_h, ch_v, "L")