Skip to content

Commit

Permalink
Fixed an unresolved merge error
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed Apr 29, 2020
1 parent 769ab52 commit 4530870
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 79 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,20 @@ processor_mpi.py implements this case. This processor
* Reads ECEI data from a bp file
* Puts the data in a queue
* A worker thread reads data from the queue and dispatches it to a MPI Executor for analysis
* Calls the multi-threaded cython kernels for data processing
* Calls the multi-threaded C/cython kernels for data processing

Some spectral analysis are implemented as C kernels and interfaced via cython.
To compile the C kernels
```
cd analysis/lib
make
```

To build the cython interface
```
cd analysis
CC=cc LDSHARED="cc -shared" python setup.py build_ext --inplace
```

Run this implemntation as
```
Expand Down
19 changes: 19 additions & 0 deletions configs/test_generator.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"shotnr": 18431,
"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/",
"nstep": 10,
"channel_range": ["L0101-2408"],
"transport":
{
"chunk_size": 10000,
"engine": "dataman",
"params":
{
"IPAddress": "128.55.205.18",
"Timeout": "20",
"Port": "12306",
"OneToOneMode": "TRUE",
"OpenTimeoutSecs": "30"
}
}
}
25 changes: 5 additions & 20 deletions processor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,28 +181,17 @@ def main():
# Generate message id and publish is
msg = AdiosMessage(tstep_idx=reader.CurrentStep(), data=stream_data)
dq.put(msg)
<<<<<<< HEAD
logger.info(f"rank{rank} Published message {msg}")
else:
logger.info(f"rank{rank} Exiting: StepStatus={stepStatus}")
break

if reader.CurrentStep() >= 90:
logger.info(f"rank{rank} Exiting: StepStatus={stepStatus}")
=======
logger.info(f"Published message {msg}")
reader.EndStep()
else:
logger.info(f"Exiting: StepStatus={stepStatus}")
>>>>>>> 8d1396632742289d44d8722271da3ab0c2de0768
dq.put(AdiosMessage(tstep_idx=None, data=None))
break
step = step + 1

#if reader.CurrentStep() >= 5:
# logger.info(f"Exiting: StepStatus={stepStatus}")
# dq.put(AdiosMessage(tstep_idx=None, data=None))
# break
if reader.CurrentStep() >= 90:
logger.info(f"Exiting: StepStatus={stepStatus}")
dq.put(AdiosMessage(tstep_idx=None, data=None))
break

last_step = reader.CurrentStep()

Expand All @@ -223,9 +212,5 @@ def main():
if __name__ == "__main__":
main()

<<<<<<< HEAD
# End of file processor_mpi.py
=======

# End of file processor_mpi.py
>>>>>>> 8d1396632742289d44d8722271da3ab0c2de0768
# End of file processor_mpi.py
35 changes: 0 additions & 35 deletions streaming/reader_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,48 +198,13 @@ def __init__(self, cfg: dict):
cfg : delta config dict
"""
super().__init__(cfg)
<<<<<<< HEAD
self.IO.SetEngine("BP4")
self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str())
self.reader = None


class reader_dataman(reader_base):
def __init__(self, cfg: dict):
"""Instantiates a DataMan reader.
Parameters:
-----------
cfg : delta config dict
"""
assert(cfg["transport"]["engine"].lower() == "dataman")
super().__init__(cfg)
self.IO.SetEngine("DataMan")
cfg["transport"]["params"].update(Port = str(12306 + self.rank))
=======
self.IO.SetEngine(cfg["transport"]["engine"])
## Set port number for DataMan
if cfg["transport"]["engine"].lower() == "dataman":
cfg["transport"]["params"].update(Port = str(int(cfg["transport"]["params"]["Port"]) + self.rank))
>>>>>>> 8d1396632742289d44d8722271da3ab0c2de0768
self.IO.SetParameters(cfg["transport"]["params"])
self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str())
self.reader = None

class reader_sst(reader_base):
def __init__(self, cfg: dict):
"""Instantiates a SST reader.
Parameters:
-----------
cfg : delta config dict
"""
assert(cfg["transport"]["engine"].lower() == "sst")
super().__init__(cfg)
self.IO.SetEngine("sst")
# SST file is stored in datapath
self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str())
#self.channel_name = "test.sst"
self.reader = None

# End of file
23 changes: 0 additions & 23 deletions streaming/writers.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,4 @@ def __init__(self, cfg):
cfg["transport"]["params"].update(Port = str(int(cfg["transport"]["params"]["Port"]) + self.rank))
self.IO.SetParameters(cfg["transport"]["params"])

<<<<<<< HEAD

class writer_bpfile(writer_base):
def __init__(self, cfg):
"""Perform BP4 specific initialization on top of writer_base."""
super().__init__(cfg)
self.IO.SetEngine("BP4")
self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str())


class writer_sst(writer_base):
def __init__(self, cfg):
"""Perform SST specific initialization on top of writer_base."""
super().__init__(cfg)
self.IO.SetEngine("SST")
self.datapath = cfg["transport"]["datapath"]
self.channel_name = gen_channel_name_v2(self.shotnr, self.chrg.to_str())

return None

# End of file a2_sender.py
=======
# End of file
>>>>>>> 8d1396632742289d44d8722271da3ab0c2de0768

0 comments on commit 4530870

Please sign in to comment.