Skip to content

Commit

Permalink
Added multi-stream writer test
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Jan 10, 2022
1 parent b5b5aac commit dea43ee
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 160 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Setup MPI
uses: mpi4py/setup-mpi@v1

- name: Run tests with pytest
run: pytest

Expand Down
27 changes: 27 additions & 0 deletions delta/data_models/base_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#-*- coding: UTF-8 -*-

"""Base classes."""

import numpy as np

class twod_chunk():
"""Base class for two-dimensional data.
This defines the interface only.
"""

def __init__(self, data):
# We should ensure that the data is contiguous so that we can remove this from
# if not data.flags.contiguous:
self.chunk_data = np.require(data, dtype=np.float64, requirements=['C', 'O', 'W', 'A'])
assert(self.chunk_data.flags.contiguous)

@property
def data(self):
"""Common interface to data."""
return self.chunk_data

@property
def shape(self):
"""Forwards to self.chunk_data.shape."""
return self.chunk_data.shape
36 changes: 10 additions & 26 deletions delta/data_models/kstar_ecei.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
import logging
import numpy as np

from data_models.base_models import twod_chunk
from data_models.channels_2d import channel_2d, channel_range, num_to_vh


class ecei_chunk():
class ecei_chunk(twod_chunk):
"""Class that represents a time-chunk of ECEI data.
This class provides the following interface.
Expand All @@ -37,7 +38,6 @@ class ecei_chunk():
"""

def __init__(self, data, tb, params=None, num_v=24, num_h=8):
# TODO: remove rarr and zarr and make them computable from params
"""Creates an ecei_chunk from a give dataset.
The first dimension indices channels, the second dimension indices time.
Expand Down Expand Up @@ -71,16 +71,18 @@ def __init__(self, data, tb, params=None, num_v=24, num_h=8):
Returns:
None
"""

super().__init__(data) # Initializes data member
self.logger = logging.getLogger("simple")
# Data should have more than 1 dimension, last dimension is time
assert(data.ndim > 1)
#
self.num_v, self.num_h = num_v, num_h

# We should ensure that the data is contiguous so that we can remove this from
# if not data.flags.contiguous:
self.ecei_data = np.require(data, dtype=np.float64, requirements=['C', 'O', 'W', 'A'])
assert(self.ecei_data.flags.contiguous)
# # We should ensure that the data is contiguous so that we can remove this from
# # if not data.flags.contiguous:
# self.ecei_data = np.require(data, dtype=np.float64, requirements=['C', 'O', 'W', 'A'])
# assert(self.ecei_data.flags.contiguous)

# Time-base for the chunk
self.tb = tb
Expand All @@ -105,15 +107,6 @@ def __init__(self, data, tb, params=None, num_v=24, num_h=8):
# bad_channels is used as a mask and has shape=(nchannels)
self.bad_channels = np.zeros((self.num_h * self.num_v), dtype=bool)

@property
def data(self):
"""Common interface to data."""
return self.ecei_data

@property
def shape(self):
"""Forwards to self.ecei_data.shape."""
return self.ecei_data.shape

def mark_bad_channels(self, verbose=False):
"""Mark bad channels.
Expand Down Expand Up @@ -173,7 +166,7 @@ def create_ft(self, fft_data, params):
freqs=None, params=params)


class ecei_chunk_ft():
class ecei_chunk_ft(twod_chunk):
"""Represents a fourier-transformed time-chunk of ECEI data."""

def __init__(self, data, tb, freqs, params=None, axis_ch=0, axis_t=1, num_v=24, num_h=8):
Expand All @@ -200,7 +193,7 @@ def __init__(self, data, tb, freqs, params=None, axis_ch=0, axis_t=1, num_v=24,
Returns:
None
"""
self.data = data
super().__init__(data) # Initializes data
self.tb = tb
self.freqs = freqs
self.params = params
Expand All @@ -209,15 +202,6 @@ def __init__(self, data, tb, freqs, params=None, axis_ch=0, axis_t=1, num_v=24,
self.num_v = num_v
self.num_h = num_h

# @property
# def data(self):
# """Common interface to data."""
# return self.data

@property
def shape(self):
"""Forwards to self.ecei_data.shape."""
return self.data.shape


def channel_range_from_str(range_str):
Expand Down
2 changes: 1 addition & 1 deletion delta/middleman.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def main():

with open(args.config, "r") as df:
cfg = json.load(df)
timeout = 60
timeout = 120

# The middleman uses both a reader and a writer. Each is configured with using
# their respective section of the config file. Therefore some keys are duplicated,
Expand Down
8 changes: 0 additions & 8 deletions delta/streaming/adios_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,4 @@ def gen_channel_name(channel_id: int, rank: int):
"""Generates a channel ID for readers."""
return f"ch{channel_id:04d}_r{rank:03d}"

def gen_channel_name_v2(shotnr: int, channel_rg: str):
"""Generates a channel ID using channel range strings. (see analysis/channels.py)"""
return f"{shotnr:05d}_ch{channel_rg:s}"


def gen_channel_name_v3(prefix: str, shotnr: int, channel_rg: str):
return f"{prefix}/{shotnr:05d}_ch{channel_rg:s}"

# End of file adios_helpers.py
40 changes: 12 additions & 28 deletions delta/streaming/reader_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,23 @@
from streaming.stream_stats import stream_stats


class reader_base():
class reader_gen():
def __init__(self, cfg: dict, stream_name: str):
"""Initializes the generic reader base class.
Arguments:
Args:
cfg (dict):
delta config dictionary
delta config dict
stream_name (str):
ADIOS2 name of the stream
Name of the data stream to read
Returns:
None
A class instance
Used keys from cfg_all:
* transport.engine - Defines the `ADIOS2 engine <https://adios2.readthedocs.io/en/latest/engines/engines.html#supported-engines>`_
* transport.params - Passed to `SetParameters <https://adios2.readthedocs.io/en/latest/api_full/api_full.html?highlight=setparameters#_CPPv4N6adios22IO13SetParametersERKNSt6stringE>`_
"""
comm = MPI.COMM_SELF
self.rank = comm.Get_rank()
Expand All @@ -32,6 +37,8 @@ def __init__(self, cfg: dict, stream_name: str):
self.logger = logging.getLogger("simple")

self.IO = self.adios.DeclareIO(gen_io_name(self.rank))
self.IO.SetEngine(cfg["engine"])
self.IO.SetParameters(cfg["params"])
# Keeps track of the past chunk sizes. This allows to construct a dummy time base
self.reader = None
self.stream_name = stream_name
Expand Down Expand Up @@ -143,27 +150,4 @@ def Get(self, varname: str, save: bool=False):
return time_chunk


class reader_gen(reader_base):
"""I don't know why we have this derived class - RK 2020-12-02."""
def __init__(self, cfg: dict, stream_name: str):
"""Instantiates a reader.
Args:
cfg (dict):
delta config dict
stream_name (str):
Name of the data stream to read
Returns:
A class instance
Used keys from cfg_all:
* transport.engine - Defines the `ADIOS2 engine <https://adios2.readthedocs.io/en/latest/engines/engines.html#supported-engines>`_
* transport.params - Passed to `SetParameters <https://adios2.readthedocs.io/en/latest/api_full/api_full.html?highlight=setparameters#_CPPv4N6adios22IO13SetParametersERKNSt6stringE>`_
"""
super().__init__(cfg, stream_name)
self.IO.SetEngine(cfg["engine"])
self.IO.SetParameters(cfg["params"])
self.reader = None

# End of file reader_mpi.py
41 changes: 10 additions & 31 deletions delta/streaming/reader_nompi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
from streaming.stream_stats import stream_stats


class reader_base():
def __init__(self, cfg: dict, stream_name: str):
class reader_gen():
def __init__(self, cfg: dict, channel_name: str):
"""Initializes the generic reader base class.
Arguments:
cfg (dict):
delta config dictionary
stream_name (str):
channel_name (str):
ADIOS2 name of the stream
Returns:
Expand All @@ -26,9 +26,11 @@ def __init__(self, cfg: dict, stream_name: str):
self.logger = logging.getLogger("simple")

self.IO = self.adios.DeclareIO(gen_io_name(0))
self.IO.SetEngine(cfg["engine"])
self.IO.SetParameters(cfg["params"])
# Keeps track of the past chunk sizes. This allows to construct a dummy time base
self.reader = None
self.stream_name = stream_name
self.channel_name = channel_name

def Open(self, multi_channel_id=None):
"""Opens a new channel.
Expand All @@ -39,13 +41,13 @@ def Open(self, multi_channel_id=None):
if multi_channel_id is not None:
self.channel_name = "%s_%02d"%(self.channel_name, multi_channel_id)

self.logger.info(f"Waiting to receive channel name {self.stream_name}")
self.logger.info(f"Waiting to receive channel name {self.channel_name}")
if self.reader is None:
self.reader = self.IO.Open(self.stream_name, adios2.Mode.Read)
self.reader = self.IO.Open(self.channel_name, adios2.Mode.Read)
# attrs = self.IO.InquireAttribute("cfg")
else:
pass
self.logger.info(f"Opened channel {self.stream_name}")
self.logger.info(f"Opened channel {self.channel_name}")
# self.logger.info(f"-> attrs = {attrs.DataString()}")

return None
Expand Down Expand Up @@ -98,7 +100,7 @@ def get_attrs(self, attrsname: str):
stream_attrs = json.loads(attrs.DataString()[0])
#except ValueError as e:
# self.logger.error(f"Could not load attributes from stream: {e}")
# raise ValueError(f"Failed to load attributes {attrsname} from {self.stream_name}")
# raise ValueError(f"Failed to load attributes {attrsname} from {self.channel_name}")

self.logger.info(f"Loaded attributes: {stream_attrs}")
# TODO: Clean up naming conventions for stream attributes
Expand Down Expand Up @@ -137,27 +139,4 @@ def Get(self, varname: str, save: bool=False):
return time_chunk


class reader_gen(reader_base):
"""I don't know why we have this derived class - RK 2020-12-02."""
def __init__(self, cfg: dict, stream_name: str):
"""Instantiates a reader.
Args:
cfg (dict):
delta config dict
stream_name (str):
Name of the data stream to read
Returns:
A class instance
Used keys from cfg_all:
* transport.engine - Defines the `ADIOS2 engine <https://adios2.readthedocs.io/en/latest/engines/engines.html#supported-engines>`_
* transport.params - Passed to `SetParameters <https://adios2.readthedocs.io/en/latest/api_full/api_full.html?highlight=setparameters#_CPPv4N6adios22IO13SetParametersERKNSt6stringE>`_
"""
super().__init__(cfg, stream_name)
self.IO.SetEngine(cfg["engine"])
self.IO.SetParameters(cfg["params"])
self.reader = None

# End of file reader_mpi.py
44 changes: 10 additions & 34 deletions delta/streaming/writer_nompi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@
from streaming.adios_helpers import gen_io_name


class writer_base():
class writer_gen():
"""Generc base class for all ADIOS2 writers."""
def __init__(self, cfg: dict, stream_name: str):
def __init__(self, cfg: dict, channel_name: str):
"""Initialize writer_base."""
# comm = MPI.COMM_WORLD
self.rank = 0 # comm.Get_rank()
# self.size = comm.Get_size()
self.rank = 0
self.logger = logging.getLogger("simple")

self.adios = adios2.ADIOS()
Expand All @@ -31,7 +30,11 @@ def __init__(self, cfg: dict, stream_name: str):
self.shape = None

# Generate a descriptive channel name
self.stream_name = stream_name
self.channel_name = channel_name

self.IO.SetEngine(cfg_transport["engine"])
self.IO.SetParameters(cfg_transport["params"])


# To generate statistics
self.stats = stream_stats()
Expand Down Expand Up @@ -85,7 +88,7 @@ def DefineAttributes(self, attrsname: str, attrs: dict):
def Open(self):
"""Opens a new channel."""
if self.writer is None:
self.writer = self.IO.Open(self.stream_name, adios2.Mode.Write)
self.writer = self.IO.Open(self.channel_name, adios2.Mode.Write)

def Close(self):
"""Wrapper for Close."""
Expand Down Expand Up @@ -143,31 +146,4 @@ def transfer_stats(self):

return stats_str


class writer_gen(writer_base):
"""I don't know why this is here - RK 2020-11-29."""
def __init__(self, cfg_transport, stream_name):
"""Instantiates a writer.
Control Adios method and params through transport section cfg
Args:
cfg_transport (dict):
This corresponds to the transport section.
stream_name (str):
Name for the adios data stream
Returns:
None
"""
super().__init__(cfg_transport, stream_name)
self.IO.SetEngine(cfg_transport["engine"])
self.IO.SetParameters(cfg_transport["params"])

if cfg_transport["engine"].lower() == "dataman":
cfg_transport["params"].update(Port=str(int(cfg_transport["params"]["Port"]) +
self.rank))


# End of file writers.py

# End of file writers.py
Loading

0 comments on commit dea43ee

Please sign in to comment.