Skip to content

Commit

Permalink
End of the week
Browse files Browse the repository at this point in the history
  • Loading branch information
rkube committed May 15, 2020
1 parent ace04ba commit 22864ff
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 37 deletions.
96 changes: 76 additions & 20 deletions analysis/tasks_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def __init__(self, task_config, fft_config, ecei_config, storage_config):
self.task_config = task_config
self.ecei_config = ecei_config
self.storage_config = storage_config

self.logger = logging.getLogger("simple")


Expand All @@ -79,18 +78,6 @@ def __init__(self, task_config, fft_config, ecei_config, storage_config):
raise NameError(f"Unknown analysis task {self.analysis}")


# Get the configuration from task_fft_scipy, but don't store the object.
fft_config["fsample"] = ecei_config["SampleRate"] * 1e3
self.my_fft = task_fft_scipy(10_000, fft_config, normalize=True, detrend=True)
self.fft_params = self.my_fft.get_fft_params()

if self.storage_config["backend"] == "numpy":
self.storage_backend = backends.backend_numpy(self.storage_config)
elif self.storage_config["backend"] == "mongo":
self.store_backend = backends.backend_mongodb(self.storage_config)
elif self.storage_config["backend"] == "null":
self.storage_backend = backends.backend_null(self.storage_config)

# Parse the reference and cross channels.
self.ref_channels = channel_range.from_str(task_config["ref_channels"])
# These channels serve as the cross-data for the spectral diagnostics
Expand All @@ -111,6 +98,23 @@ def __init__(self, task_config, fft_config, ecei_config, storage_config):
# Total number of chunks, i.e. the number of futures appended to the list per call to calculate
self.num_chunks = (len(self.unique_channels) + self.channel_chunk_size - 1) // self.channel_chunk_size

# Get the configuration from task_fft_scipy, but don't store the object.
fft_config["fsample"] = ecei_config["SampleRate"] * 1e3
self.my_fft = task_fft_scipy(self.channel_chunk_size, fft_config, normalize=True, detrend=True)
self.fft_params = self.my_fft.get_fft_params()

self.storage_backend = None
if self.storage_config["backend"] == "numpy":
self.storage_backend = backends.backend_numpy(self.storage_config)
elif self.storage_config["backend"] == "mongo":
self.store_backend = backends.backend_mongodb(self.storage_config)
elif self.storage_config["backend"] == "null":
self.storage_backend = backends.backend_null(self.storage_config)
else:
raise NameError(f"Unknown storage backend requested: {self.storage_config}")

self.storage_backend.store_metadata(self.task_config, self.get_dispatch_sequence())


def get_dispatch_sequence(self, niter=None):
"""Returns an a list of iterables that together span all unique
Expand Down Expand Up @@ -156,7 +160,7 @@ def calc_and_store(self, stream_data, ch_it, info_dict):

return None

def submit(self, executor, data, tidx):
def submit(self, executor, fft_data, tidx):
"""Submits a kernel to the executor
Note: When we are submitting member functions on the executioner we are losing the
Expand All @@ -167,19 +171,71 @@ def submit(self, executor, data, tidx):
info_dict_list = [{"analysis_name": self.analysis,
"tidx": tidx,
"channel_batch": chunk_idx} for chunk_idx in range(self.num_chunks)]

_ = [executor.submit(self.calc_and_store, fft_data, ch_it, info_dict) for ch_it, info_dict in zip(self.get_dispatch_sequence(), info_dict_list)]

return None



class task_list_spectral():
"""Defines a group of analysis that, together with an FFT, are
performed on a PEP-3148 exeecutor"""

def __init__(self, executor_anl, executor_fft, task_config_list, fft_config, ecei_config, storage_config):
"""Initialize the object with a list of tasks to be performed.
These tasks share a common channel list.
Inputs:
=======
executor_anl: PEP-3148 executor for running analysis
executor_fft: PEP-3148 executor to execute FFTs on.
task_list: dict, defines parameters of the analysis to be performed
fft_config: dict, gives parameters of the fourier-transformed data
ecei_config: dict, information on ecei diagnostic
storage_config: dict, information on storage backend.
"""

self.executor_anl = executor_anl
self.executor_fft = executor_fft
self.task_config_list = task_config_list
# Don't store fft_config but use fft_params from one of the tasks instead.
# Do this since we need the sampling frequency, which is calculated from ECEi data.
#self.fft_config = fft_config
self.ecei_config = ecei_config
self.storage_config = storage_config

self.logger = logging.getLogger("simple")

self.task_list = []
for task_cfg in self.task_config_list:
self.task_list.append(task_spectral(task_cfg, fft_config, self.ecei_config, self.storage_config))

self.fft_params = self.task_list[0].fft_params


def submit(self, data, tidx):
"""Performs magic"""

self.logger.info(f"task_list.submit is called. tidx={tidx}. data.shape= {data.shape}. fft_params={self.fft_params}")
tic_fft = timeit.default_timer()
res = executor.submit(stft, data, axis=1, fs=self.fft_params["fs"], nperseg=self.fft_params["nfft"],
window=self.fft_params["window"], detrend=self.fft_params["detrend"],
noverlap=self.fft_params["noverlap"], padded=False, return_onesided=False, boundary=None)
res = self.executor_fft.submit(stft, data, axis=1, fs=self.fft_params["fs"], nperseg=self.fft_params["nfft"],
window=self.fft_params["window"], detrend=self.fft_params["detrend"],
noverlap=self.fft_params["noverlap"], padded=False, return_onesided=False, boundary=None)
fft_data = res.result()
fft_data = np.fft.fftshift(fft_data[2], axes=1)
toc_fft = timeit.default_timer()
self.logger.info(f"FFT took {(toc_fft - tic_fft):6.4f}s")
self.logger.info(f"tidx {tidx}: FFT took {(toc_fft - tic_fft):6.4f}s")

for task in self.task_list:
task.submit(self.executor_anl, fft_data, tidx)





_ = [executor.submit(self.calc_and_store, fft_data, ch_it, info_dict) for ch_it, info_dict in zip(self.get_dispatch_sequence(), info_dict_list)]

return None



Expand Down
2 changes: 1 addition & 1 deletion configs/test_all.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"chunk_size": 10000,
"nstep": 500,
"datapath": "/global/cscratch1/sd/rkube/KSTAR/kstar_streaming/018431/",
"engine": "BP4",
"engine": "SST",
"channel_range": ["L0101-2408"],
"params":
{
Expand Down
12 changes: 6 additions & 6 deletions processor_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ def main():
# Create ADIOS reader object
reader = reader_gen(cfg["transport"])

# Create the task list
task_list = []
for task_config in cfg["task_list"]:
#task_list.append(task_spectral(task_config, fft_params, cfg["ECEI_cfg"]))
task_list.append(task_spectral(task_config, cfg["fft_params"], cfg["ECEI_cfg"], cfg["storage"]))
store_backend.store_metadata(task_config, task_list[-1].get_dispatch_sequence())
# Create a list of individual spectral tasks
#task_list = []
#for task_config in cfg["task_list"]:
# #task_list.append(task_spectral(task_config, fft_params, cfg["ECEI_cfg"]))
# #task_list.append(task_spectral(task_config, cfg["fft_params"], cfg["ECEI_cfg"], cfg["storage"]))
# #store_backend.store_metadata(task_config, task_list[-1].get_dispatch_sequence())

dq = queue.Queue()
msg = None
Expand Down
18 changes: 9 additions & 9 deletions processor_mpi_tasklist.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def consume(Q, task_list):

while True:
try:
msg = Q.get(timeout=2.0)
msg = Q.get(timeout=20.0)
except queue.Empty:
logger.info("Empty queue after waiting until time-out. Exiting")
break
Expand Down Expand Up @@ -168,7 +168,7 @@ def main():
# Create a global executor
#executor = concurrent.futures.ThreadPoolExecutor(max_workers=60)
executor_fft = MPIPoolExecutor(max_workers=4, mpi_info={"host": "nid00104"})
executor_anl = MPIPoolExecutor(max_workers=60, mpi_info={"hostfile": "mpi_hosts.txt"})
executor_anl = MPIPoolExecutor(max_workers=10, mpi_info={"hostfile": "mpi_hosts.txt"})
adios2_varname = channel_range.from_str(cfg["transport"]["channel_range"][0])

#with MPICommExecutor(MPI.COMM_WORLD) as executor:
Expand All @@ -190,7 +190,6 @@ def main():

store_backend.store_one({"run_id": cfg['run_id'], "run_config": cfg})


# Create ADIOS reader object
reader = reader_gen(cfg["transport"])
task_list = task_list_spectral(executor_anl, executor_fft, cfg["task_list"], cfg["fft_params"], cfg["ECEI_cfg"], cfg["storage"])
Expand Down Expand Up @@ -222,7 +221,7 @@ def main():
stream_data = reader.Get(adios2_varname, save=False)
rx_list.append(reader.CurrentStep())

# Generate message id and publish is
# Generate message id and publish
msg = AdiosMessage(tstep_idx=reader.CurrentStep(), data=stream_data)
dq.put_nowait(msg)
logger.info(f"Published message {msg}")
Expand All @@ -232,10 +231,10 @@ def main():
break

# #Early stopping for debug
# if reader.CurrentStep() > 50:
# logger.info(f"Exiting: CurrentStep={reader.CurrentStep()}, StepStatus={stepStatus}")
# dq.put(AdiosMessage(tstep_idx=None, data=None))
# break
if reader.CurrentStep() > 5:
logger.info(f"Exiting: CurrentStep={reader.CurrentStep()}, StepStatus={stepStatus}")
dq.put(AdiosMessage(tstep_idx=None, data=None))
break

last_step = reader.CurrentStep()

Expand All @@ -249,7 +248,8 @@ def main():
logger.info("Workers have joined")

# Shotdown the executioner
executor.shutdown(wait=True)
executor_anl.shutdown(wait=True)
executor_fft.shutdown(wait=True)

toc_main = timeit.default_timer()
logger.info(f"Run {cfg['run_id']} finished in {(toc_main - tic_main):6.4f}s")
Expand Down
3 changes: 2 additions & 1 deletion streaming/reader_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,9 @@ def Get(self, ch_rg: channel_range, save: bool=False):


# elif isinstance(channels, type(None)):
self.logger.debug(f"Reading varname {ch_rg.to_str()}. Step no. {self.CurrentStep():d}")
self.logger.info(f"Reading varname {ch_rg.to_str()}. Step no. {self.CurrentStep():d}")
var = self.IO.InquireVariable(ch_rg.to_str())
self.logger.info(f"Inquired variables: {var}")
time_chunk = np.zeros(var.Shape(), dtype=np.float64)
self.reader.Get(var, time_chunk, adios2.Mode.Sync)

Expand Down

0 comments on commit 22864ff

Please sign in to comment.