Skip to content

Commit

Permalink
Fixed streaming from nersc DTN to cori compute
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Apr 30, 2020
1 parent 6cc36e1 commit 31eb326
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 14 deletions.
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ Note that the processor is called receiver

### MPI processor
This is the xyz=mpi case.
processor_mpi.py implements this case. This processor
processor_mpi.py implements this flexible workflow. This processor
* Reads ECEI data from a bp file
* Puts the data in a queue
* A worker thread reads data from the queue and dispatches it to a MPI Executor for analysis
* Calls the multi-threaded C/cython kernels for data processing

This workflow allows for flexible channel sizes, so we call it the flexible workflow.

Some spectral analysis are implemented as C kernels and interfaced via cython.
To compile the C kernels
```
Expand All @@ -90,15 +92,25 @@ cd analysis
CC=cc LDSHARED="cc -shared" python setup.py build_ext --inplace
```

Run this implemntation as
Run this implemntation on cori compute nodes as
```
moule unload craype-hugepages2M
module unload darshan
module switch PrgEnv-intel PrgEnv-gnu
module use -a /global/cscratch1/sd/jyc/sw/modulefiles
module load adios2/devel
module load py-pyyaml
export OMP_NUM_THREAD=N
srun -n 6 -c N python -m mpi4py.futures processor_mpi.py --config configs/test_crossphase.json
```

For the KNL nodes, best performance is with N=8/16 and 24 or 48 MPI ranks.

Data storage is implmented for numpy and mongodb backends. See the configuration files configs/test_all.json.
The mongodb backend allows to store data either internally using gridFS or on the filesystem using numpy.
Using mongodb with the numpy backend still stores all metadata in mongodb.
Using the pure numpy backend stores also the metadata in numpy files.


### MPI processor brute
Expand All @@ -119,7 +131,7 @@ module use -a /global/cscratch1/sd/jyc/dtn/sw/modulefiles
module load openmpi
module load zeromq adios2
module load python py-numpy py-mpi4py py-h5py py-scipy py-matplotlib
module load python py-numpy py-mpi4py py-h5py py-yaml py-scipy py-matplotlib py-pyyaml
mpirun -n 5 python -u -m mpi4py.futures receiver_brute.py --config config-dtn.json
```
Expand Down
3 changes: 2 additions & 1 deletion configs/test_all.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
"chunk_size": 10000,
"nstep": 100,
"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/",
"engine": "BP4",
"engine": "SST",
"channel_range": ["L0101-0808"],
"params":
{
"IPAddress": "128.55.205.18",
"DataTransport": "WAN",
"Timeout": "300",
"Port": "12400",
"OneToOneMode": "TRUE",
Expand Down
15 changes: 8 additions & 7 deletions processor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,18 @@ def main():
logger.info(f"{rank} Waiting for generator")
reader.Open()
last_step = 0
logger.info(f"Starting main loop")

rx_list = []
while True:
stepStatus = reader.BeginStep()
if last_step == reader.CurrentStep():
continue

logger.info(f"stepStatus = {stepStatus}")
#if last_step == reader.CurrentStep():
# continue
logger.info(f"currentStep = {reader.CurrentStep()}")
if stepStatus:
# Read data
logger.info(f"stepStatus == True")
stream_data = reader.Get(adios2_varname, save=True)
rx_list.append(reader.CurrentStep())

Expand All @@ -190,12 +193,10 @@ def main():
break

if reader.CurrentStep() >= 90:
logger.info(f"Exiting: StepStatus={stepStatus}")
else:
logger.info(f"rank{rank} Exiting: StepStatus={stepStatus}")
logger.info(f"Exiting: CurrentStep={reader.CurrentStep()}, StepStatus={stepStatus}")
dq.put(AdiosMessage(tstep_idx=None, data=None))
break


last_step = reader.CurrentStep()

logger.info("Exiting main loop")
Expand Down
5 changes: 4 additions & 1 deletion streaming/reader_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def Open(self):
pass
self.logger.info(f"Opened channel {self.channel_name}")

return None

def BeginStep(self):
"""Wrapper for reader.BeginStep()"""
res = self.reader.BeginStep()
Expand Down Expand Up @@ -198,7 +200,8 @@ def __init__(self, cfg: dict):
cfg : delta config dict
"""
super().__init__(cfg)

self.IO.SetEngine(cfg["transport"]["engine"])
self.IO.SetParameters(cfg["transport"]["params"])
self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str())
self.reader = None

Expand Down
5 changes: 3 additions & 2 deletions streaming/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ def __init__(self, cfg):

super().__init__(cfg)
self.IO.SetEngine(cfg["transport"]["engine"])
self.IO.SetParameters(cfg["transport"]["params"])

if cfg["transport"]["engine"].lower() == "dataman":
cfg["transport"]["params"].update(Port = str(int(cfg["transport"]["params"]["Port"]) + self.rank))
self.IO.SetParameters(cfg["transport"]["params"])


# End of file writers.py
# End of file writers.pyf

0 comments on commit 31eb326

Please sign in to comment.