Skip to content

Commit

Permalink
Updated multi-stream test programs
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Jan 10, 2022
1 parent bc3eb3e commit 2c89c53
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 14 deletions.
17 changes: 11 additions & 6 deletions delta/streaming/reader_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,38 @@


class reader_gen():
def __init__(self, cfg: dict, stream_name: str):
def __init__(self, cfg_transport: dict, stream_name: str):
"""Initializes the generic reader base class.
Args:
cfg (dict):
cfg_transport (dict):
delta config dict
stream_name (str):
Name of the data stream to read
Returns:
A class instance
Used keys from cfg_all:
Used keys from cfg:
* transport.engine - Defines the `ADIOS2 engine <https://adios2.readthedocs.io/en/latest/engines/engines.html#supported-engines>`_
* transport.params - Passed to `SetParameters <https://adios2.readthedocs.io/en/latest/api_full/api_full.html?highlight=setparameters#_CPPv4N6adios22IO13SetParametersERKNSt6stringE>`_
"""
comm = MPI.COMM_SELF
comm = MPI.COMM_WORLD
self.rank = comm.Get_rank()
self.size = comm.Get_size()
# This should be MPI.COMM_SELF, not MPI.COMM_WORLD
self.adios = adios2.ADIOS(MPI.COMM_SELF)
self.logger = logging.getLogger("simple")

self.IO = self.adios.DeclareIO(gen_io_name(self.rank))
self.IO.SetEngine(cfg["engine"])
self.IO.SetParameters(cfg["params"])
self.IO.SetEngine(cfg_transport["engine"])
if cfg_transport["engine"].lower() == "dataman":
cfg_transport["params"].update(Port=str(int(cfg_transport["params"]["Port"]) +
2 * self.rank))

self.logger.info(f"rank: {self.rank:d} - port = {cfg_transport['params']['Port']}")
self.IO.SetParameters(cfg_transport["params"])
# Keeps track of the past chunk sizes. This allows to construct a dummy time base
self.reader = None
self.stream_name = stream_name
Expand Down
6 changes: 4 additions & 2 deletions delta/streaming/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ def __init__(self, cfg_transport: dict, channel_name: str):

# Set IO parameters
self.IO.SetEngine(cfg_transport["engine"])
self.IO.SetParameters(cfg_transport["params"])

if cfg_transport["engine"].lower() == "dataman":
cfg_transport["params"].update(Port=str(int(cfg_transport["params"]["Port"]) +
self.rank))
2 * self.rank))

print(f"rank: {self.rank:d} - port = {cfg_transport['params']['Port']}")
self.IO.SetParameters(cfg_transport["params"])

# To generate statistics
self.stats = stream_stats()
Expand Down
63 changes: 63 additions & 0 deletions tests/test_reader_multichannel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python
#-*- Encoding: UTF-8 -*-

"""Uses readers in multi-channel setting
Command
```
$ srun -n NN python test_reader_multichannel.py
"""

from mpi4py import MPI
import adios2
import numpy as np

import logging

import sys
sys.path.append("/global/homes/r/rkube/repos/delta/delta")
from streaming.reader_mpi import reader_gen
from streaming.adios_helpers import gen_channel_name
from data_models.base_models import twod_chunk


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

logging.basicConfig(level=20,
format=f"rank {rank:1d} - " + " %(asctime)15s - %(levelname)s - %(message)s")

# Define transport section for transport
cfg_transport = {
"datapath": "tmp_nersc",
"engine": "dataman",
"params":
{
"IPAddress": "128.55.205.18",
"Timeout": "120",
"Port": "50001",
"TransportMode": "fast"
}
}


channel_name = gen_channel_name(2408, rank)

if rank == 0:
logging.info("==================I am test_reader_multichannel===================")
logging.info(f"rank {rank:d} / size {size:d}. Channel_name = {channel_name}")

r = reader_gen(cfg_transport, channel_name)
r.Open()

for tstep in range(1, 100):
stepStatus = r.BeginStep(timeoutSeconds=5.0)
if stepStatus:
stream_data = r.Get("dummy", save=False)
logging.info(f"rank {rank:d}, tstep = {tstep}, mean = {stream_data.mean()}")
r.EndStep()


# End of file test_reader_multichannel.py
17 changes: 11 additions & 6 deletions tests/test_writer_multichannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Command
```
$ python test_writer_mulitchannel.py
$ srun -n NN test_writer_mulitchannel.py
```
"""
Expand All @@ -15,6 +15,8 @@
import adios2
import numpy as np

import logging

import sys
sys.path.append("/global/homes/r/rkube/repos/delta/delta")
from streaming.writers import writer_gen
Expand All @@ -25,11 +27,13 @@
rank = comm.Get_rank()
size = comm.Get_size()

logging.basicConfig(level=20,
format=f"rank {rank:1d} - " + " %(asctime)15s - %(levelname)s - %(message)s")

# Define config section for transport
cfg_transport = {
"datapath": "tmp_nersc",
"engine": "BP4",
"engine": "dataman",
"params":
{
"IPAddress": "128.55.205.18",
Expand All @@ -40,23 +44,24 @@
}

channel_name = gen_channel_name(2408, rank)
data = np.random.normal(rank + 1.0, 0.1, size=(192, 10_000))

if rank == 0:
print("==================I am test_writer_multichannel===================")
print(f"rank {rank:d} / size {size:d}. Channel_name = {channel_name}")
logging.info("==================I am test_reader_multichannel===================")
logging.info(f"rank {rank:d} / size {size:d}. Channel_name = {channel_name}")


w = writer_gen(cfg_transport, channel_name)
w.DefineVariable("dummy", (192, 10_000), np.float64)
w.Open()
w.DefineAttributes("strem_attrs", {"test": "yes"})

for tstep in range(1, 5):
for tstep in range(1, 100):
data = np.random.normal(100.0 * (rank + 1) + tstep, 0.1, size=(192, 10_000))
chunk = twod_chunk(data)

w.BeginStep()
w.put_data(chunk)
w.EndStep()
logging.info(f"tstep={tstep}")

w.writer.Close()

0 comments on commit 2c89c53

Please sign in to comment.