diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 7a8890a..6109d8b 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -32,6 +32,10 @@ jobs: run: | python -m pip install --upgrade pip pip install -r requirements.txt + + - name: Setup MPI + uses: mpi4py/setup-mpi@v1 + - name: Run tests with pytest run: pytest diff --git a/delta/data_models/base_models.py b/delta/data_models/base_models.py new file mode 100644 index 0000000..4794370 --- /dev/null +++ b/delta/data_models/base_models.py @@ -0,0 +1,27 @@ +#-*- coding: UTF-8 -*- + +"""Base classes.""" + +import numpy as np + +class twod_chunk(): + """Base class for two-dimensional data. + + This defines the interface only. + """ + + def __init__(self, data): + # We should ensure that the data is contiguous so that we can remove this from + # if not data.flags.contiguous: + self.chunk_data = np.require(data, dtype=np.float64, requirements=['C', 'O', 'W', 'A']) + assert(self.chunk_data.flags.contiguous) + + @property + def data(self): + """Common interface to data.""" + return self.chunk_data + + @property + def shape(self): + """Forwards to self.chunk_data.shape.""" + return self.chunk_data.shape diff --git a/delta/data_models/kstar_ecei.py b/delta/data_models/kstar_ecei.py index 9aadd3f..c7537d5 100644 --- a/delta/data_models/kstar_ecei.py +++ b/delta/data_models/kstar_ecei.py @@ -11,10 +11,11 @@ import logging import numpy as np +from data_models.base_models import twod_chunk from data_models.channels_2d import channel_2d, channel_range, num_to_vh -class ecei_chunk(): +class ecei_chunk(twod_chunk): """Class that represents a time-chunk of ECEI data. This class provides the following interface. @@ -37,7 +38,6 @@ class ecei_chunk(): """ def __init__(self, data, tb, params=None, num_v=24, num_h=8): - # TODO: remove rarr and zarr and make them computable from params """Creates an ecei_chunk from a give dataset. The first dimension indices channels, the second dimension indices time. @@ -71,16 +71,18 @@ def __init__(self, data, tb, params=None, num_v=24, num_h=8): Returns: None """ + + super().__init__(data) # Initializes data member self.logger = logging.getLogger("simple") # Data should have more than 1 dimension, last dimension is time assert(data.ndim > 1) # self.num_v, self.num_h = num_v, num_h - # We should ensure that the data is contiguous so that we can remove this from - # if not data.flags.contiguous: - self.ecei_data = np.require(data, dtype=np.float64, requirements=['C', 'O', 'W', 'A']) - assert(self.ecei_data.flags.contiguous) + # # We should ensure that the data is contiguous so that we can remove this from + # # if not data.flags.contiguous: + # self.ecei_data = np.require(data, dtype=np.float64, requirements=['C', 'O', 'W', 'A']) + # assert(self.ecei_data.flags.contiguous) # Time-base for the chunk self.tb = tb @@ -105,15 +107,6 @@ def __init__(self, data, tb, params=None, num_v=24, num_h=8): # bad_channels is used as a mask and has shape=(nchannels) self.bad_channels = np.zeros((self.num_h * self.num_v), dtype=bool) - @property - def data(self): - """Common interface to data.""" - return self.ecei_data - - @property - def shape(self): - """Forwards to self.ecei_data.shape.""" - return self.ecei_data.shape def mark_bad_channels(self, verbose=False): """Mark bad channels. @@ -173,7 +166,7 @@ def create_ft(self, fft_data, params): freqs=None, params=params) -class ecei_chunk_ft(): +class ecei_chunk_ft(twod_chunk): """Represents a fourier-transformed time-chunk of ECEI data.""" def __init__(self, data, tb, freqs, params=None, axis_ch=0, axis_t=1, num_v=24, num_h=8): @@ -200,7 +193,7 @@ def __init__(self, data, tb, freqs, params=None, axis_ch=0, axis_t=1, num_v=24, Returns: None """ - self.data = data + super().__init__(data) # Initializes data self.tb = tb self.freqs = freqs self.params = params @@ -209,15 +202,6 @@ def __init__(self, data, tb, freqs, params=None, axis_ch=0, axis_t=1, num_v=24, self.num_v = num_v self.num_h = num_h - # @property - # def data(self): - # """Common interface to data.""" - # return self.data - - @property - def shape(self): - """Forwards to self.ecei_data.shape.""" - return self.data.shape def channel_range_from_str(range_str): diff --git a/delta/middleman.py b/delta/middleman.py index df206f7..eae3ba8 100644 --- a/delta/middleman.py +++ b/delta/middleman.py @@ -92,7 +92,7 @@ def main(): with open(args.config, "r") as df: cfg = json.load(df) - timeout = 60 + timeout = 120 # The middleman uses both a reader and a writer. Each is configured with using # their respective section of the config file. Therefore some keys are duplicated, diff --git a/delta/streaming/adios_helpers.py b/delta/streaming/adios_helpers.py index abd2a5f..8e07203 100644 --- a/delta/streaming/adios_helpers.py +++ b/delta/streaming/adios_helpers.py @@ -14,12 +14,4 @@ def gen_channel_name(channel_id: int, rank: int): """Generates a channel ID for readers.""" return f"ch{channel_id:04d}_r{rank:03d}" -def gen_channel_name_v2(shotnr: int, channel_rg: str): - """Generates a channel ID using channel range strings. (see analysis/channels.py)""" - return f"{shotnr:05d}_ch{channel_rg:s}" - - -def gen_channel_name_v3(prefix: str, shotnr: int, channel_rg: str): - return f"{prefix}/{shotnr:05d}_ch{channel_rg:s}" - # End of file adios_helpers.py \ No newline at end of file diff --git a/delta/streaming/reader_mpi.py b/delta/streaming/reader_mpi.py index d574040..7954adc 100644 --- a/delta/streaming/reader_mpi.py +++ b/delta/streaming/reader_mpi.py @@ -11,18 +11,23 @@ from streaming.stream_stats import stream_stats -class reader_base(): +class reader_gen(): def __init__(self, cfg: dict, stream_name: str): """Initializes the generic reader base class. - Arguments: + Args: cfg (dict): - delta config dictionary + delta config dict stream_name (str): - ADIOS2 name of the stream + Name of the data stream to read Returns: - None + A class instance + + Used keys from cfg_all: + * transport.engine - Defines the `ADIOS2 engine `_ + * transport.params - Passed to `SetParameters `_ + """ comm = MPI.COMM_SELF self.rank = comm.Get_rank() @@ -32,6 +37,8 @@ def __init__(self, cfg: dict, stream_name: str): self.logger = logging.getLogger("simple") self.IO = self.adios.DeclareIO(gen_io_name(self.rank)) + self.IO.SetEngine(cfg["engine"]) + self.IO.SetParameters(cfg["params"]) # Keeps track of the past chunk sizes. This allows to construct a dummy time base self.reader = None self.stream_name = stream_name @@ -143,27 +150,4 @@ def Get(self, varname: str, save: bool=False): return time_chunk -class reader_gen(reader_base): - """I don't know why we have this derived class - RK 2020-12-02.""" - def __init__(self, cfg: dict, stream_name: str): - """Instantiates a reader. - - Args: - cfg (dict): - delta config dict - stream_name (str): - Name of the data stream to read - - Returns: - A class instance - - Used keys from cfg_all: - * transport.engine - Defines the `ADIOS2 engine `_ - * transport.params - Passed to `SetParameters `_ - """ - super().__init__(cfg, stream_name) - self.IO.SetEngine(cfg["engine"]) - self.IO.SetParameters(cfg["params"]) - self.reader = None - # End of file reader_mpi.py diff --git a/delta/streaming/reader_nompi.py b/delta/streaming/reader_nompi.py index 80c379b..5c98f69 100644 --- a/delta/streaming/reader_nompi.py +++ b/delta/streaming/reader_nompi.py @@ -9,14 +9,14 @@ from streaming.stream_stats import stream_stats -class reader_base(): - def __init__(self, cfg: dict, stream_name: str): +class reader_gen(): + def __init__(self, cfg: dict, channel_name: str): """Initializes the generic reader base class. Arguments: cfg (dict): delta config dictionary - stream_name (str): + channel_name (str): ADIOS2 name of the stream Returns: @@ -26,9 +26,11 @@ def __init__(self, cfg: dict, stream_name: str): self.logger = logging.getLogger("simple") self.IO = self.adios.DeclareIO(gen_io_name(0)) + self.IO.SetEngine(cfg["engine"]) + self.IO.SetParameters(cfg["params"]) # Keeps track of the past chunk sizes. This allows to construct a dummy time base self.reader = None - self.stream_name = stream_name + self.channel_name = channel_name def Open(self, multi_channel_id=None): """Opens a new channel. @@ -39,13 +41,13 @@ def Open(self, multi_channel_id=None): if multi_channel_id is not None: self.channel_name = "%s_%02d"%(self.channel_name, multi_channel_id) - self.logger.info(f"Waiting to receive channel name {self.stream_name}") + self.logger.info(f"Waiting to receive channel name {self.channel_name}") if self.reader is None: - self.reader = self.IO.Open(self.stream_name, adios2.Mode.Read) + self.reader = self.IO.Open(self.channel_name, adios2.Mode.Read) # attrs = self.IO.InquireAttribute("cfg") else: pass - self.logger.info(f"Opened channel {self.stream_name}") + self.logger.info(f"Opened channel {self.channel_name}") # self.logger.info(f"-> attrs = {attrs.DataString()}") return None @@ -98,7 +100,7 @@ def get_attrs(self, attrsname: str): stream_attrs = json.loads(attrs.DataString()[0]) #except ValueError as e: # self.logger.error(f"Could not load attributes from stream: {e}") - # raise ValueError(f"Failed to load attributes {attrsname} from {self.stream_name}") + # raise ValueError(f"Failed to load attributes {attrsname} from {self.channel_name}") self.logger.info(f"Loaded attributes: {stream_attrs}") # TODO: Clean up naming conventions for stream attributes @@ -137,27 +139,4 @@ def Get(self, varname: str, save: bool=False): return time_chunk -class reader_gen(reader_base): - """I don't know why we have this derived class - RK 2020-12-02.""" - def __init__(self, cfg: dict, stream_name: str): - """Instantiates a reader. - - Args: - cfg (dict): - delta config dict - stream_name (str): - Name of the data stream to read - - Returns: - A class instance - - Used keys from cfg_all: - * transport.engine - Defines the `ADIOS2 engine `_ - * transport.params - Passed to `SetParameters `_ - """ - super().__init__(cfg, stream_name) - self.IO.SetEngine(cfg["engine"]) - self.IO.SetParameters(cfg["params"]) - self.reader = None - # End of file reader_mpi.py diff --git a/delta/streaming/writer_nompi.py b/delta/streaming/writer_nompi.py index 729a293..782d995 100644 --- a/delta/streaming/writer_nompi.py +++ b/delta/streaming/writer_nompi.py @@ -13,13 +13,12 @@ from streaming.adios_helpers import gen_io_name -class writer_base(): +class writer_gen(): """Generc base class for all ADIOS2 writers.""" - def __init__(self, cfg: dict, stream_name: str): + def __init__(self, cfg: dict, channel_name: str): """Initialize writer_base.""" # comm = MPI.COMM_WORLD - self.rank = 0 # comm.Get_rank() - # self.size = comm.Get_size() + self.rank = 0 self.logger = logging.getLogger("simple") self.adios = adios2.ADIOS() @@ -31,7 +30,11 @@ def __init__(self, cfg: dict, stream_name: str): self.shape = None # Generate a descriptive channel name - self.stream_name = stream_name + self.channel_name = channel_name + + self.IO.SetEngine(cfg_transport["engine"]) + self.IO.SetParameters(cfg_transport["params"]) + # To generate statistics self.stats = stream_stats() @@ -85,7 +88,7 @@ def DefineAttributes(self, attrsname: str, attrs: dict): def Open(self): """Opens a new channel.""" if self.writer is None: - self.writer = self.IO.Open(self.stream_name, adios2.Mode.Write) + self.writer = self.IO.Open(self.channel_name, adios2.Mode.Write) def Close(self): """Wrapper for Close.""" @@ -143,31 +146,4 @@ def transfer_stats(self): return stats_str - -class writer_gen(writer_base): - """I don't know why this is here - RK 2020-11-29.""" - def __init__(self, cfg_transport, stream_name): - """Instantiates a writer. - - Control Adios method and params through transport section cfg - - Args: - cfg_transport (dict): - This corresponds to the transport section. - stream_name (str): - Name for the adios data stream - - Returns: - None - """ - super().__init__(cfg_transport, stream_name) - self.IO.SetEngine(cfg_transport["engine"]) - self.IO.SetParameters(cfg_transport["params"]) - - if cfg_transport["engine"].lower() == "dataman": - cfg_transport["params"].update(Port=str(int(cfg_transport["params"]["Port"]) + - self.rank)) - - -# End of file writers.py - +# End of file writers.py \ No newline at end of file diff --git a/delta/streaming/writers.py b/delta/streaming/writers.py index 6bc7fa7..2f6d45d 100644 --- a/delta/streaming/writers.py +++ b/delta/streaming/writers.py @@ -15,9 +15,9 @@ from streaming.adios_helpers import gen_io_name -class writer_base(): +class writer_gen(): """Generc base class for all ADIOS2 writers.""" - def __init__(self, cfg: dict, stream_name: str): + def __init__(self, cfg_transport: dict, channel_name: str): """Initialize writer_base.""" comm = MPI.COMM_WORLD self.rank = comm.Get_rank() @@ -33,8 +33,16 @@ def __init__(self, cfg: dict, stream_name: str): self.shape = None # Generate a descriptive channel name - self.stream_name = stream_name + self.channel_name = channel_name + # Set IO parameters + self.IO.SetEngine(cfg_transport["engine"]) + self.IO.SetParameters(cfg_transport["params"]) + + if cfg_transport["engine"].lower() == "dataman": + cfg_transport["params"].update(Port=str(int(cfg_transport["params"]["Port"]) + + self.rank)) + # To generate statistics self.stats = stream_stats() @@ -87,7 +95,7 @@ def DefineAttributes(self, attrsname: str, attrs: dict): def Open(self): """Opens a new channel.""" if self.writer is None: - self.writer = self.IO.Open(self.stream_name, adios2.Mode.Write) + self.writer = self.IO.Open(self.channel_name, adios2.Mode.Write) def Close(self): """Wrapper for Close.""" @@ -146,30 +154,4 @@ def transfer_stats(self): return stats_str -class writer_gen(writer_base): - """I don't know why this is here - RK 2020-11-29.""" - def __init__(self, cfg_transport, stream_name): - """Instantiates a writer. - - Control Adios method and params through transport section cfg - - Args: - cfg_transport (dict): - This corresponds to the transport section. - stream_name (str): - Name for the adios data stream - - Returns: - None - """ - super().__init__(cfg_transport, stream_name) - self.IO.SetEngine(cfg_transport["engine"]) - self.IO.SetParameters(cfg_transport["params"]) - - if cfg_transport["engine"].lower() == "dataman": - cfg_transport["params"].update(Port=str(int(cfg_transport["params"]["Port"]) + - self.rank)) - - -# End of file writers.py - +# End of file writers.py \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index 517fe27..dedf5a6 100644 --- a/pytest.ini +++ b/pytest.ini @@ -3,4 +3,4 @@ log_cli = True testpaths = tests -addopts = --ignore=tests/test_executor.py +addopts = --ignore=tests/test_executor.py --ignore=tests/test_writer_multichannel.py diff --git a/tests/test_writer_multichannel.py b/tests/test_writer_multichannel.py new file mode 100644 index 0000000..d391acc --- /dev/null +++ b/tests/test_writer_multichannel.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python +#-*- Encoding: UTF-8 -*- + +"""Uses writers in multi-channel setting. + +Command + +``` +$ python test_writer_mulitchannel.py +``` + +""" + +from mpi4py import MPI +import adios2 +import numpy as np + +import sys +sys.path.append("/global/homes/r/rkube/repos/delta/delta") +from streaming.writers import writer_gen +from streaming.adios_helpers import gen_channel_name +from data_models.base_models import twod_chunk + +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +size = comm.Get_size() + + +# Define config section for transport +cfg_transport = { + "datapath": "tmp_nersc", + "engine": "BP4", + "params": + { + "IPAddress": "128.55.205.18", + "Timeout": "120", + "Port": "50001", + "TransportMode": "fast" + } +} + +channel_name = gen_channel_name(2408, rank) +data = np.random.normal(rank + 1.0, 0.1, size=(192, 10_000)) + +if rank == 0: + print("==================I am test_writer_multichannel===================") +print(f"rank {rank:d} / size {size:d}. Channel_name = {channel_name}") + +w = writer_gen(cfg_transport, channel_name) +w.DefineVariable("dummy", (192, 10_000), np.float64) +w.Open() +w.DefineAttributes("strem_attrs", {"test": "yes"}) + +for tstep in range(1, 5): + data = np.random.normal(100.0 * (rank + 1) + tstep, 0.1, size=(192, 10_000)) + chunk = twod_chunk(data) + + w.BeginStep() + w.put_data(chunk) + w.EndStep() + +w.writer.Close()