From c65038f57c7780f2c4afee27f77e5ce15213574c Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Thu, 29 Jun 2023 15:16:03 +0000 Subject: [PATCH 01/26] add agile-io-consumer skeleton --- src/sst/elements/ember/Makefile.am | 2 + .../agile-io-consumer/agile-io-consumer.cc | 59 +++++++++++++++++++ .../agile-io-consumer/agile-io-consumer.h | 35 +++++++++++ .../agile-io-consumer/agile_io_consumer.py | 55 +++++++++++++++++ 4 files changed, 151 insertions(+) create mode 100644 src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc create mode 100644 src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h create mode 100644 src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py diff --git a/src/sst/elements/ember/Makefile.am b/src/sst/elements/ember/Makefile.am index 999d462c3a..de91e586b1 100644 --- a/src/sst/elements/ember/Makefile.am +++ b/src/sst/elements/ember/Makefile.am @@ -37,6 +37,8 @@ libember_la_SOURCES = \ embermotiflog.h \ embermotiflog.cc \ embermemoryev.h \ + agile-io-consumer/agile-io-consumer.h \ + agile-io-consumer/agile-io-consumer.cc \ libs/emberLib.h \ libs/misc.h \ libs/miscEvents/emberGetNodeNumEvent.h \ diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc new file mode 100644 index 0000000000..0908ad7e75 --- /dev/null +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -0,0 +1,59 @@ +// Copyright 2009-2023 NTESS. Under the terms +// of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Copyright (c) 2009-2023, NTESS +// All rights reserved. +// +// Portions are copyright of other developers: +// See the file CONTRIBUTORS.TXT in the top level directory +// of the distribution for more information. +// +// This file is part of the SST software package. For license +// information, see the LICENSE file in the top level directory of the +// distribution. + + +#include +#include +#include "agile-io-consumer.h" +#include "emberevent.h" +#include "mpi/embermpigen.h" + +#include +#include +#include +#include +#include +#include + +using namespace SST; +using namespace SST::Ember; + + + +agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMessagePassingGenerator(id, prms, "Null") +{ +} + +bool +agileIOconsumer::generate(std::queue& evQ) +{ +} + +void +agileIOconsumer::read_request() +{ +} + +int +agileIOconsumer::write_data() +{ + return 0; +} + +int +agileIOconsumer::num_io_nodes() +{ + return 0; +} diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h new file mode 100644 index 0000000000..80e1acf464 --- /dev/null +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -0,0 +1,35 @@ +#ifndef AGILE_IO_CONSUMER_H_ +#define AGILE_IO_CONSUMER_H_ + +#include "mpi/embermpigen.h" + +namespace SST { + +namespace Ember { + +class agileIOconsumer : public EmberMessagePassingGenerator +{ + public: + SST_ELI_REGISTER_SUBCOMPONENT( + agileIOconsumer, + "ember", + "IOConsumerMotif", + SST_ELI_ELEMENT_VERSION(1,0,0), + "Performs a IOConsumer Motif", + SST::Ember::EmberGenerator + ) + + agileIOconsumer(SST::ComponentId_t id, Params& prms); + ~agileIOconsumer() {} + + bool generate(std::queue& evQ); + void read_request(); + int write_data(); + int num_io_nodes(); +}; + +} + +} + +#endif /* AGILE_IO_CONSUMER_H_ */ diff --git a/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py b/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py new file mode 100644 index 0000000000..cd579f975e --- /dev/null +++ b/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py @@ -0,0 +1,55 @@ +import sst +from sst.merlin.base import * +from sst.merlin.endpoint import * +from sst.merlin.interface import * +from sst.merlin.topology import * + +from sst.ember import * + +def example(): + PlatformDefinition.setCurrentPlatform("firefly-defaults") + + ### Setup the topology + topo = topoDragonFly() + topo.hosts_per_router = 2 + topo.routers_per_group = 4 + topo.intergroup_links = 2 + topo.num_groups = 4 + topo.algorithm = ["minimal","adaptive-local"] + + # Set up the routers + router = hr_router() + router.link_bw = "4GB/s" + router.flit_size = "8B" + router.xbar_bw = "6GB/s" + router.input_latency = "20ns" + router.output_latency = "20ns" + router.input_buf_size = "4kB" + router.output_buf_size = "4kB" + router.num_vns = 2 + router.xbar_arb = "merlin.xbar_arb_lru" + + topo.router = router + topo.link_latency = "20ns" + + ### set up the endpoint + networkif = ReorderLinkControl() + networkif.link_bw = "4GB/s" + networkif.input_buf_size = "1kB" + networkif.output_buf_size = "1kB" + + ep = EmberMPIJob(0,topo.getNumNodes()) + ep.network_interface = networkif + ep.addMotif("Init") + ep.addMotif("Example") + ep.addMotif("Fini") + ep.nic.nic2host_lat= "100ns" + + system = System() + system.setTopology(topo) + system.allocateNodes(ep,"linear") + + system.build() + +if __name__ == "__main__": + example() From 6a738dc434265a39f4d9033df6a2bd1149af0a75 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 14 Jul 2023 21:38:59 +0000 Subject: [PATCH 02/26] update agileIOconsumer --- .../agile-io-consumer/agile-io-consumer.cc | 104 ++++++++++++++++++ .../agile-io-consumer/agile-io-consumer.h | 30 +++++ 2 files changed, 134 insertions(+) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 0908ad7e75..49d522a792 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -14,11 +14,18 @@ // distribution. +#include +#include +#include +#include #include #include #include "agile-io-consumer.h" #include "emberevent.h" +#include "event.h" #include "mpi/embermpigen.h" +#include "params.h" +#include "sst/elements/hermes/msgapi.h" #include #include @@ -26,6 +33,11 @@ #include #include #include +#include + +#ifndef MPI_ANY_SOURCE +#define MPI_ANY_SOURCE -1 +#endif using namespace SST; using namespace SST::Ember; @@ -34,11 +46,47 @@ using namespace SST::Ember; agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMessagePassingGenerator(id, prms, "Null") { + rank_ = EmberMessagePassingGenerator::rank(); + + if (rank_ == 0) { + std::cout << "Running agileIOconsumer motif\n"; + } + + prms.find_array("arg.IONodes", ionodes); + + long buffer_size = combined_read_size / ionodes.size(); + + sendBuf = memAlloc(buffer_size); + for (int i = 0; i < ionodes.size(); i++) { + recvBuf.push_back(memAlloc(buffer_size)); + } +} + +void agileIOconsumer::init() +{ + rank_ = EmberMessagePassingGenerator::rank(); + + auto it = find(ionodes.begin(), ionodes.end(), rank_); + if (it == ionodes.end()) { + kind = Blue; + std::cerr << "Blue\n"; + } + else { + kind = Green; + std::cerr << "Green\n"; + } } bool agileIOconsumer::generate(std::queue& evQ) { + switch (kind) { + case Green: + break; + case Blue: + break; + } + return true; } void @@ -57,3 +105,59 @@ agileIOconsumer::num_io_nodes() { return 0; } + +// Sent to all the IO nodes +void agileIOconsumer::validate(const long total_request_size) { + int count = ionodes.size(); + long request = total_request_size / count; + if (request * count != total_request_size) { + std::cerr << request << " * " << count << " != " << total_request_size + << "\n"; + abort(); + } +} + +// send each of the IO nodes a request for total_request_size +// it will be up to them to return total_request_size / num_io_nodes +void agileIOconsumer::broadcast_and_receive(const long &total_request_size, + std::queue &evQ) { + for (int i = 0; i < ionodes.size(); i++) { + // send( Queue& q, Addr payload, uint32_t count, PayloadDataType dtype, + // RankID dest, uint32_t tag, Communicator group) + std::stringstream sstream; + sstream << "file_request: " << total_request_size << " " << rank_; + auto payload_string = sstream.str(); + auto payload = payload_string.data(); + auto len = strlen(payload); + enQ_send(evQ, payload, len, CHAR, ionodes[i], 0, GroupWorld); + enQ_recv(evQ, recvBuf[i], total_request_size / ionodes.size(), CHAR, MPI_ANY_SOURCE, 0, GroupWorld); + } +} + +void agileIOconsumer::blue_request(const long total_request_size) { + std::queue &evQ = *evQ_; + + validate(total_request_size); + + broadcast_and_receive(total_request_size, evQ); +} + +// Each IO node responds with amount of data read, possibly less than requested +long +agileIOconsumer::green_read() +{ + std::queue &evQ = *evQ_; + + char incoming_payload[100]; + + std::string str; + long count; + uint64_t target; + + enQ_recv(evQ, incoming_payload, 100, CHAR, MPI_ANY_SOURCE, 0, GroupWorld); + std::stringstream foo(incoming_payload); + foo >> str >> count >> target; + enQ_send(evQ, sendBuf, count, CHAR, target, 0, GroupWorld); + + return 0; +} diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index 80e1acf464..a897ff8c60 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -2,11 +2,15 @@ #define AGILE_IO_CONSUMER_H_ #include "mpi/embermpigen.h" +#include "sst/elements/hermes/msgapi.h" +#include namespace SST { namespace Ember { +const long combined_read_size = 10*1024*1024; + class agileIOconsumer : public EmberMessagePassingGenerator { public: @@ -19,13 +23,39 @@ class agileIOconsumer : public EmberMessagePassingGenerator SST::Ember::EmberGenerator ) + + SST_ELI_DOCUMENT_PARAMS( + {"arg.IONodes", "Array of IO nodes", "0"} + ) + agileIOconsumer(SST::ComponentId_t id, Params& prms); ~agileIOconsumer() {} + void init(); + bool generate(std::queue& evQ); void read_request(); int write_data(); int num_io_nodes(); + + // Sent to all the IO nodes + void validate(const long total_request_size); + void broadcast_and_receive(const long &total_request_size, std::queue &evQ); + void blue_request(long total_request_size); + + // Each IO node responds with amount of data read + long green_read(); + + private: + + Hermes::MP::Addr sendBuf; + std::vector recvBuf; + std::queue* evQ_; + uint64_t rank_; + std::vector ionodes; + // Whether the node is IO (green) or consumer (blue) + enum Kind { Green, Blue }; + Kind kind; }; } From 4dfc4fc9b7e928b09ddddf4a8d7f15c702846267 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Mon, 24 Jul 2023 16:36:59 +0000 Subject: [PATCH 03/26] use IOConsumer motif --- src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py b/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py index cd579f975e..aafd4b1f72 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py +++ b/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py @@ -41,7 +41,7 @@ def example(): ep = EmberMPIJob(0,topo.getNumNodes()) ep.network_interface = networkif ep.addMotif("Init") - ep.addMotif("Example") + ep.addMotif("IOConsumer") ep.addMotif("Fini") ep.nic.nic2host_lat= "100ns" From 51d7d909088d85d16d2780129c9470b3ce6b0302 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Mon, 24 Jul 2023 20:58:55 +0000 Subject: [PATCH 04/26] misc. clang-tidy recommendations --- .../ember/agile-io-consumer/agile-io-consumer.cc | 8 ++++---- .../ember/agile-io-consumer/agile-io-consumer.h | 10 +++------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 49d522a792..7ba6fc816a 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -14,6 +14,7 @@ // distribution. +#include #include #include #include @@ -147,15 +148,14 @@ long agileIOconsumer::green_read() { std::queue &evQ = *evQ_; - - char incoming_payload[100]; + std::array incoming_payload; std::string str; long count; uint64_t target; - enQ_recv(evQ, incoming_payload, 100, CHAR, MPI_ANY_SOURCE, 0, GroupWorld); - std::stringstream foo(incoming_payload); + enQ_recv(evQ, incoming_payload.data(), 100, CHAR, MPI_ANY_SOURCE, 0, GroupWorld); + std::stringstream foo(incoming_payload.data()); foo >> str >> count >> target; enQ_send(evQ, sendBuf, count, CHAR, target, 0, GroupWorld); diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index a897ff8c60..82a2625176 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -5,9 +5,7 @@ #include "sst/elements/hermes/msgapi.h" #include -namespace SST { - -namespace Ember { +namespace SST::Ember { const long combined_read_size = 10*1024*1024; @@ -29,11 +27,11 @@ class agileIOconsumer : public EmberMessagePassingGenerator ) agileIOconsumer(SST::ComponentId_t id, Params& prms); - ~agileIOconsumer() {} + ~agileIOconsumer() override = default; void init(); - bool generate(std::queue& evQ); + bool generate(std::queue& evQ) override; void read_request(); int write_data(); int num_io_nodes(); @@ -60,6 +58,4 @@ class agileIOconsumer : public EmberMessagePassingGenerator } -} - #endif /* AGILE_IO_CONSUMER_H_ */ From c325938a3ec95363671148f3a6857065246a708e Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Tue, 15 Aug 2023 17:15:21 +0000 Subject: [PATCH 05/26] assign the queue --- src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 7ba6fc816a..d7f9b4d8d5 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -81,6 +81,8 @@ void agileIOconsumer::init() bool agileIOconsumer::generate(std::queue& evQ) { + evQ_ = &evQ; + switch (kind) { case Green: break; From 5a67d73a7d1dfd3cba8b0251706d056f8e1815e2 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Tue, 15 Aug 2023 17:57:43 +0000 Subject: [PATCH 06/26] incremental changes --- .../ember/agile-io-consumer/agile-io-consumer.cc | 11 ++++++----- .../ember/agile-io-consumer/agile-io-consumer.h | 2 +- .../ember/agile-io-consumer/agile_io_consumer.py | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index d7f9b4d8d5..97ce21ad46 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -63,7 +63,7 @@ agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMes } } -void agileIOconsumer::init() +void agileIOconsumer::setup() { rank_ = EmberMessagePassingGenerator::rank(); @@ -127,12 +127,13 @@ void agileIOconsumer::broadcast_and_receive(const long &total_request_size, for (int i = 0; i < ionodes.size(); i++) { // send( Queue& q, Addr payload, uint32_t count, PayloadDataType dtype, // RankID dest, uint32_t tag, Communicator group) + char buffer[100]; std::stringstream sstream; sstream << "file_request: " << total_request_size << " " << rank_; - auto payload_string = sstream.str(); - auto payload = payload_string.data(); - auto len = strlen(payload); - enQ_send(evQ, payload, len, CHAR, ionodes[i], 0, GroupWorld); + std::string payload_string = sstream.str(); + auto len = payload_string.copy(buffer, payload_string.length()); + buffer[len] = '\0'; + enQ_send(evQ, buffer, len, CHAR, ionodes[i], 0, GroupWorld); enQ_recv(evQ, recvBuf[i], total_request_size / ionodes.size(), CHAR, MPI_ANY_SOURCE, 0, GroupWorld); } } diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index 82a2625176..f83301ca89 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -29,7 +29,7 @@ class agileIOconsumer : public EmberMessagePassingGenerator agileIOconsumer(SST::ComponentId_t id, Params& prms); ~agileIOconsumer() override = default; - void init(); + void setup() override; bool generate(std::queue& evQ) override; void read_request(); diff --git a/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py b/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py index aafd4b1f72..3ed756f4aa 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py +++ b/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py @@ -41,7 +41,7 @@ def example(): ep = EmberMPIJob(0,topo.getNumNodes()) ep.network_interface = networkif ep.addMotif("Init") - ep.addMotif("IOConsumer") + ep.addMotif("IOConsumer IONodes=[0,4,8,12]") ep.addMotif("Fini") ep.nic.nic2host_lat= "100ns" From a5015b224026a48fbb4b93ea9c1c20a6cc6a8423 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Tue, 29 Aug 2023 21:33:08 +0000 Subject: [PATCH 07/26] small update --- .../ember/agile-io-consumer/agile-io-consumer.cc | 15 ++++----------- .../ember/agile-io-consumer/agile-io-consumer.h | 4 ++-- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 97ce21ad46..9d816a9b1d 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -82,14 +82,6 @@ bool agileIOconsumer::generate(std::queue& evQ) { evQ_ = &evQ; - - switch (kind) { - case Green: - break; - case Blue: - break; - } - return true; } void @@ -138,7 +130,8 @@ void agileIOconsumer::broadcast_and_receive(const long &total_request_size, } } -void agileIOconsumer::blue_request(const long total_request_size) { +bool +agileIOconsumer::blue_request(const long total_request_size) { std::queue &evQ = *evQ_; validate(total_request_size); @@ -147,7 +140,7 @@ void agileIOconsumer::blue_request(const long total_request_size) { } // Each IO node responds with amount of data read, possibly less than requested -long +bool agileIOconsumer::green_read() { std::queue &evQ = *evQ_; @@ -162,5 +155,5 @@ agileIOconsumer::green_read() foo >> str >> count >> target; enQ_send(evQ, sendBuf, count, CHAR, target, 0, GroupWorld); - return 0; + return false; } diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index f83301ca89..9f092d3d05 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -39,10 +39,10 @@ class agileIOconsumer : public EmberMessagePassingGenerator // Sent to all the IO nodes void validate(const long total_request_size); void broadcast_and_receive(const long &total_request_size, std::queue &evQ); - void blue_request(long total_request_size); + bool blue_request(long total_request_size); // Each IO node responds with amount of data read - long green_read(); + bool green_read(); private: From e66362c3151df372f1d2d838178cec1f642222e4 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Wed, 30 Aug 2023 17:38:27 +0000 Subject: [PATCH 08/26] progress --- .../agile-io-consumer/agile-io-consumer.cc | 47 +++++++++++++++---- .../agile-io-consumer/agile-io-consumer.h | 19 ++++++-- 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 9d816a9b1d..6d2e0241f1 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -26,6 +26,7 @@ #include "event.h" #include "mpi/embermpigen.h" #include "params.h" +#include "sst/elements/hermes/hermes.h" #include "sst/elements/hermes/msgapi.h" #include @@ -43,7 +44,7 @@ using namespace SST; using namespace SST::Ember; - +int agileIOconsumer::memory_bitmask = 0; agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMessagePassingGenerator(id, prms, "Null") { @@ -55,12 +56,11 @@ agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMes prms.find_array("arg.IONodes", ionodes); - long buffer_size = combined_read_size / ionodes.size(); + count = ionodes.size(); - sendBuf = memAlloc(buffer_size); - for (int i = 0; i < ionodes.size(); i++) { - recvBuf.push_back(memAlloc(buffer_size)); - } + long buffer_size = combined_read_size / count; + + iteration = -1; } void agileIOconsumer::setup() @@ -82,6 +82,34 @@ bool agileIOconsumer::generate(std::queue& evQ) { evQ_ = &evQ; + + if (iteration == -1) { + // Handle memory allocation + memSetBacked(); + if (rank_ == 1) { + // Rank 1 is Blue + enQ_memAlloc(evQ, &sendBuf, sizeof(Ember::PacketHeader)); + recvBuf = new Hermes::MemAddr[count]; + for (int i = 0; i < count; i++) { + enQ_memAlloc(evQ, &recvBuf[i], sizeof(Ember::PacketHeader)); + } + memory_bitmask |= (1 << rank_); + } + if (kind == Green) { + enQ_memAlloc(evQ, &sendBuf, sizeof(Ember::PacketHeader)); + memory_bitmask |= (1 << rank_); + } + } + + if (memory_bitmask != magicNumber) { + return false; + } + + iteration++; + + if (rank_ == 1) return blue_request(combined_read_size); + if (kind == Green) return green_read(); + return false; } void @@ -114,7 +142,8 @@ void agileIOconsumer::validate(const long total_request_size) { // send each of the IO nodes a request for total_request_size // it will be up to them to return total_request_size / num_io_nodes -void agileIOconsumer::broadcast_and_receive(const long &total_request_size, +bool +agileIOconsumer::broadcast_and_receive(const long &total_request_size, std::queue &evQ) { for (int i = 0; i < ionodes.size(); i++) { // send( Queue& q, Addr payload, uint32_t count, PayloadDataType dtype, @@ -128,6 +157,8 @@ void agileIOconsumer::broadcast_and_receive(const long &total_request_size, enQ_send(evQ, buffer, len, CHAR, ionodes[i], 0, GroupWorld); enQ_recv(evQ, recvBuf[i], total_request_size / ionodes.size(), CHAR, MPI_ANY_SOURCE, 0, GroupWorld); } + + return true; } bool @@ -136,7 +167,7 @@ agileIOconsumer::blue_request(const long total_request_size) { validate(total_request_size); - broadcast_and_receive(total_request_size, evQ); + return broadcast_and_receive(total_request_size, evQ); } // Each IO node responds with amount of data read, possibly less than requested diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index 9f092d3d05..c688c90442 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -2,13 +2,23 @@ #define AGILE_IO_CONSUMER_H_ #include "mpi/embermpigen.h" +#include "sst/elements/hermes/hermes.h" #include "sst/elements/hermes/msgapi.h" #include +#include namespace SST::Ember { const long combined_read_size = 10*1024*1024; +struct PacketHeader { + uint64_t src; + uint64_t dst; + uint64_t len; +}; + +const int magicNumber = (1 << 0) + (1 << 4) + (1 << 8) + (1 << 12) + (1 << 1); + class agileIOconsumer : public EmberMessagePassingGenerator { public: @@ -38,16 +48,19 @@ class agileIOconsumer : public EmberMessagePassingGenerator // Sent to all the IO nodes void validate(const long total_request_size); - void broadcast_and_receive(const long &total_request_size, std::queue &evQ); + bool broadcast_and_receive(const long &total_request_size, std::queue &evQ); bool blue_request(long total_request_size); // Each IO node responds with amount of data read bool green_read(); private: + unsigned count; + long iteration; + static int memory_bitmask; - Hermes::MP::Addr sendBuf; - std::vector recvBuf; + Hermes::MemAddr sendBuf; + Hermes::MemAddr *recvBuf; std::queue* evQ_; uint64_t rank_; std::vector ionodes; From bb5d861c9a0d4b27fc0eba961d21d67c5e5293e1 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Wed, 30 Aug 2023 18:16:15 +0000 Subject: [PATCH 09/26] small changes --- .../ember/agile-io-consumer/agile-io-consumer.cc | 13 +++++++------ .../ember/agile-io-consumer/agile-io-consumer.h | 8 ++++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 6d2e0241f1..d432141a70 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -88,15 +88,16 @@ agileIOconsumer::generate(std::queue& evQ) memSetBacked(); if (rank_ == 1) { // Rank 1 is Blue - enQ_memAlloc(evQ, &sendBuf, sizeof(Ember::PacketHeader)); - recvBuf = new Hermes::MemAddr[count]; + enQ_memAlloc(evQ, &blue_sendBuf, sizeof(Ember::PacketHeader)); + blue_recvBuf = new Hermes::MemAddr[count]; for (int i = 0; i < count; i++) { - enQ_memAlloc(evQ, &recvBuf[i], sizeof(Ember::PacketHeader)); + enQ_memAlloc(evQ, &blue_recvBuf[i], sizeof(Ember::PacketHeader)); } memory_bitmask |= (1 << rank_); } if (kind == Green) { - enQ_memAlloc(evQ, &sendBuf, sizeof(Ember::PacketHeader)); + enQ_memAlloc(evQ, &green_sendBuf, sizeof(Ember::PacketHeader)); + enQ_memAlloc(evQ, &green_recvBuf, sizeof(Ember::PacketHeader)); memory_bitmask |= (1 << rank_); } } @@ -155,7 +156,7 @@ agileIOconsumer::broadcast_and_receive(const long &total_request_size, auto len = payload_string.copy(buffer, payload_string.length()); buffer[len] = '\0'; enQ_send(evQ, buffer, len, CHAR, ionodes[i], 0, GroupWorld); - enQ_recv(evQ, recvBuf[i], total_request_size / ionodes.size(), CHAR, MPI_ANY_SOURCE, 0, GroupWorld); + enQ_recv(evQ, blue_recvBuf[i], total_request_size / ionodes.size(), CHAR, MPI_ANY_SOURCE, 0, GroupWorld); } return true; @@ -184,7 +185,7 @@ agileIOconsumer::green_read() enQ_recv(evQ, incoming_payload.data(), 100, CHAR, MPI_ANY_SOURCE, 0, GroupWorld); std::stringstream foo(incoming_payload.data()); foo >> str >> count >> target; - enQ_send(evQ, sendBuf, count, CHAR, target, 0, GroupWorld); + enQ_send(evQ, green_sendBuf, count, CHAR, target, 0, GroupWorld); return false; } diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index c688c90442..c78aff6c69 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -5,6 +5,7 @@ #include "sst/elements/hermes/hermes.h" #include "sst/elements/hermes/msgapi.h" #include +#include #include namespace SST::Ember { @@ -59,8 +60,11 @@ class agileIOconsumer : public EmberMessagePassingGenerator long iteration; static int memory_bitmask; - Hermes::MemAddr sendBuf; - Hermes::MemAddr *recvBuf; + Hermes::MemAddr blue_sendBuf = nullptr; + Hermes::MemAddr *blue_recvBuf = nullptr; + Hermes::MemAddr green_sendBuf = nullptr; + Hermes::MemAddr green_recvBuf = nullptr; + std::queue* evQ_; uint64_t rank_; std::vector ionodes; From ae187a9503cea09f627e48bf1623d5a84d65c5c4 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Wed, 30 Aug 2023 18:54:52 +0000 Subject: [PATCH 10/26] jci --- .../elements/ember/agile-io-consumer/agile-io-consumer.cc | 5 ++--- src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index d432141a70..cad5f5c47d 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -45,6 +45,7 @@ using namespace SST; using namespace SST::Ember; int agileIOconsumer::memory_bitmask = 0; +long agileIOconsumer::iteration = -1; agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMessagePassingGenerator(id, prms, "Null") { @@ -59,8 +60,6 @@ agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMes count = ionodes.size(); long buffer_size = combined_read_size / count; - - iteration = -1; } void agileIOconsumer::setup() @@ -106,7 +105,7 @@ agileIOconsumer::generate(std::queue& evQ) return false; } - iteration++; + enQ_barrier(evQ, GroupWorld); if (rank_ == 1) return blue_request(combined_read_size); if (kind == Green) return green_read(); diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index c78aff6c69..05787b2159 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -57,7 +57,7 @@ class agileIOconsumer : public EmberMessagePassingGenerator private: unsigned count; - long iteration; + static long iteration; static int memory_bitmask; Hermes::MemAddr blue_sendBuf = nullptr; From 73a0f2e26fe86caed20f4553aa6a84675e4a8e48 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Thu, 31 Aug 2023 18:12:46 +0000 Subject: [PATCH 11/26] another iteration --- .../elements/ember/agile-io-consumer/agile-io-consumer.cc | 8 ++++---- .../elements/ember/agile-io-consumer/agile-io-consumer.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index cad5f5c47d..469398c40d 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -45,7 +45,6 @@ using namespace SST; using namespace SST::Ember; int agileIOconsumer::memory_bitmask = 0; -long agileIOconsumer::iteration = -1; agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMessagePassingGenerator(id, prms, "Null") { @@ -82,7 +81,7 @@ agileIOconsumer::generate(std::queue& evQ) { evQ_ = &evQ; - if (iteration == -1) { + if (first) { // Handle memory allocation memSetBacked(); if (rank_ == 1) { @@ -99,14 +98,15 @@ agileIOconsumer::generate(std::queue& evQ) enQ_memAlloc(evQ, &green_recvBuf, sizeof(Ember::PacketHeader)); memory_bitmask |= (1 << rank_); } + first = false; + + return false; } if (memory_bitmask != magicNumber) { return false; } - enQ_barrier(evQ, GroupWorld); - if (rank_ == 1) return blue_request(combined_read_size); if (kind == Green) return green_read(); return false; diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index 05787b2159..d0d4b871ff 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -57,7 +57,7 @@ class agileIOconsumer : public EmberMessagePassingGenerator private: unsigned count; - static long iteration; + bool first = true; static int memory_bitmask; Hermes::MemAddr blue_sendBuf = nullptr; From 8161016b39c4be4153cb865c01e3221e34d5b74a Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Tue, 5 Sep 2023 21:22:12 +0000 Subject: [PATCH 12/26] remove recv payload --- .../ember/agile-io-consumer/agile-io-consumer.cc | 14 ++++++-------- .../ember/agile-io-consumer/agile-io-consumer.h | 3 +++ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 469398c40d..ee5ef7bea3 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -175,16 +175,14 @@ bool agileIOconsumer::green_read() { std::queue &evQ = *evQ_; - std::array incoming_payload; - - std::string str; - long count; uint64_t target; - enQ_recv(evQ, incoming_payload.data(), 100, CHAR, MPI_ANY_SOURCE, 0, GroupWorld); - std::stringstream foo(incoming_payload.data()); - foo >> str >> count >> target; + enQ_recv(evQ, green_recvBuf, PacketSize, CHAR, Hermes::MP::AnyTag, 0, GroupWorld, &green_mesgResp); + enQ_wait(evQ, NULL, &green_mesgResp); + if (green_mesgResp.status == false) { + return false; + } enQ_send(evQ, green_sendBuf, count, CHAR, target, 0, GroupWorld); - return false; + return true; } diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index d0d4b871ff..0b6dd8d5f5 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -18,6 +18,8 @@ struct PacketHeader { uint64_t len; }; +const unsigned long PacketSize = sizeof(Ember::PacketHeader) / sizeof(uint64_t); + const int magicNumber = (1 << 0) + (1 << 4) + (1 << 8) + (1 << 12) + (1 << 1); class agileIOconsumer : public EmberMessagePassingGenerator @@ -64,6 +66,7 @@ class agileIOconsumer : public EmberMessagePassingGenerator Hermes::MemAddr *blue_recvBuf = nullptr; Hermes::MemAddr green_sendBuf = nullptr; Hermes::MemAddr green_recvBuf = nullptr; + MessageResponse green_mesgResp; std::queue* evQ_; uint64_t rank_; From eb836e444fdf6c7274572a2138dea6ab818bd483 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Wed, 6 Sep 2023 17:39:43 +0000 Subject: [PATCH 13/26] send packets not strings --- .../agile-io-consumer/agile-io-consumer.cc | 35 +++++++++---------- .../agile-io-consumer/agile-io-consumer.h | 2 ++ 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index ee5ef7bea3..3ba3f275dc 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -103,10 +103,6 @@ agileIOconsumer::generate(std::queue& evQ) return false; } - if (memory_bitmask != magicNumber) { - return false; - } - if (rank_ == 1) return blue_request(combined_read_size); if (kind == Green) return green_read(); return false; @@ -148,17 +144,16 @@ agileIOconsumer::broadcast_and_receive(const long &total_request_size, for (int i = 0; i < ionodes.size(); i++) { // send( Queue& q, Addr payload, uint32_t count, PayloadDataType dtype, // RankID dest, uint32_t tag, Communicator group) - char buffer[100]; - std::stringstream sstream; - sstream << "file_request: " << total_request_size << " " << rank_; - std::string payload_string = sstream.str(); - auto len = payload_string.copy(buffer, payload_string.length()); - buffer[len] = '\0'; - enQ_send(evQ, buffer, len, CHAR, ionodes[i], 0, GroupWorld); - enQ_recv(evQ, blue_recvBuf[i], total_request_size / ionodes.size(), CHAR, MPI_ANY_SOURCE, 0, GroupWorld); + PacketHeader* send_buffer = (PacketHeader*) blue_sendBuf.getBacking(); + send_buffer->dst = ionodes[i]; + send_buffer->src = rank_; + send_buffer->len = 24; + enQ_send(evQ, blue_sendBuf, PacketSize, UINT64_T, ionodes[i], Tag, GroupWorld); + // enQ_send(evQ, buffer, len, CHAR, ionodes[i], 0, GroupWorld); + // enQ_recv(evQ, blue_recvBuf[i], total_request_size / ionodes.size(), CHAR, MPI_ANY_SOURCE, 0, GroupWorld); } - return true; + return false; } bool @@ -175,14 +170,16 @@ bool agileIOconsumer::green_read() { std::queue &evQ = *evQ_; - uint64_t target; + // uint64_t target; + + MessageResponse response = {0}; - enQ_recv(evQ, green_recvBuf, PacketSize, CHAR, Hermes::MP::AnyTag, 0, GroupWorld, &green_mesgResp); - enQ_wait(evQ, NULL, &green_mesgResp); - if (green_mesgResp.status == false) { + enQ_recv(evQ, green_recvBuf, PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &response); + enQ_wait(evQ, NULL, &response); + if (response.status == false) { return false; } - enQ_send(evQ, green_sendBuf, count, CHAR, target, 0, GroupWorld); + // enQ_send(evQ, green_sendBuf, count, CHAR, target, 0, GroupWorld); - return true; + return false; } diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index 0b6dd8d5f5..0e9b81b4da 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -11,6 +11,7 @@ namespace SST::Ember { const long combined_read_size = 10*1024*1024; +const long Tag = 37; struct PacketHeader { uint64_t src; @@ -66,6 +67,7 @@ class agileIOconsumer : public EmberMessagePassingGenerator Hermes::MemAddr *blue_recvBuf = nullptr; Hermes::MemAddr green_sendBuf = nullptr; Hermes::MemAddr green_recvBuf = nullptr; + MessageRequest blue_mesgReq; MessageResponse green_mesgResp; std::queue* evQ_; From 7e5b80947947f94ab7e489726a6cea4267c89c9a Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Wed, 6 Sep 2023 20:05:09 +0000 Subject: [PATCH 14/26] skeleton is working now --- .../agile-io-consumer/agile-io-consumer.cc | 73 +++++++++++++------ .../agile-io-consumer/agile-io-consumer.h | 7 +- 2 files changed, 56 insertions(+), 24 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 3ba3f275dc..7c50eece17 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -86,10 +86,12 @@ agileIOconsumer::generate(std::queue& evQ) memSetBacked(); if (rank_ == 1) { // Rank 1 is Blue - enQ_memAlloc(evQ, &blue_sendBuf, sizeof(Ember::PacketHeader)); + blue_mesgReq = new MessageRequest[count]; blue_recvBuf = new Hermes::MemAddr[count]; + blue_sendBuf = new Hermes::MemAddr[count]; for (int i = 0; i < count; i++) { enQ_memAlloc(evQ, &blue_recvBuf[i], sizeof(Ember::PacketHeader)); + enQ_memAlloc(evQ, &blue_sendBuf[i], sizeof(Ember::PacketHeader)); } memory_bitmask |= (1 << rank_); } @@ -127,7 +129,6 @@ agileIOconsumer::num_io_nodes() // Sent to all the IO nodes void agileIOconsumer::validate(const long total_request_size) { - int count = ionodes.size(); long request = total_request_size / count; if (request * count != total_request_size) { std::cerr << request << " * " << count << " != " << total_request_size @@ -141,19 +142,34 @@ void agileIOconsumer::validate(const long total_request_size) { bool agileIOconsumer::broadcast_and_receive(const long &total_request_size, std::queue &evQ) { - for (int i = 0; i < ionodes.size(); i++) { - // send( Queue& q, Addr payload, uint32_t count, PayloadDataType dtype, - // RankID dest, uint32_t tag, Communicator group) - PacketHeader* send_buffer = (PacketHeader*) blue_sendBuf.getBacking(); - send_buffer->dst = ionodes[i]; - send_buffer->src = rank_; - send_buffer->len = 24; - enQ_send(evQ, blue_sendBuf, PacketSize, UINT64_T, ionodes[i], Tag, GroupWorld); - // enQ_send(evQ, buffer, len, CHAR, ionodes[i], 0, GroupWorld); - // enQ_recv(evQ, blue_recvBuf[i], total_request_size / ionodes.size(), CHAR, MPI_ANY_SOURCE, 0, GroupWorld); + if (blue_round == 0) { + blue_round++; + for (int i = 0; i < count; i++) { + // send( Queue& q, Addr payload, uint32_t count, PayloadDataType dtype, + // RankID dest, uint32_t tag, Communicator group) + PacketHeader* send_buffer = (PacketHeader*) blue_sendBuf[i].getBacking(); + send_buffer->dst = ionodes[i]; + send_buffer->src = rank_; + send_buffer->len = 24; + enQ_send(evQ, blue_sendBuf[i], PacketSize, UINT64_T, ionodes[i], Tag, GroupWorld); + } + return false; + } + else if (blue_round == 1) { + blue_round++; + for (int i = 0; i < count; i++) { + PacketHeader *recv_buffer = (PacketHeader*) blue_recvBuf[i].getBacking(); + enQ_irecv(evQ, blue_recvBuf[i], PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &blue_mesgReq[i]); + enQ_wait(evQ, &blue_mesgReq[i]); + } + return false; + } + else { + for (int i = 0; i < count; i++) { + PacketHeader *ph = (PacketHeader*)blue_recvBuf[i].getBacking(); + } + return true; } - - return false; } bool @@ -172,14 +188,27 @@ agileIOconsumer::green_read() std::queue &evQ = *evQ_; // uint64_t target; - MessageResponse response = {0}; - - enQ_recv(evQ, green_recvBuf, PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &response); - enQ_wait(evQ, NULL, &response); - if (response.status == false) { + if (green_read_first) { + green_read_first = false; + enQ_irecv(evQ, green_recvBuf, PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &green_mesgReq); + // enQ_recv(evQ, green_recvBuf, PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &green_mesgResp); + enQ_wait(evQ, &green_mesgReq, &green_mesgResp); + // enQ_wait(evQ, NULL, &green_mesgResp); + // if (green_mesgResp.status == false) { + // return false; + // } + // enQ_send(evQ, green_sendBuf, count, CHAR, target, 0, GroupWorld); return false; } - // enQ_send(evQ, green_sendBuf, count, CHAR, target, 0, GroupWorld); - - return false; + else { + PacketHeader *ph = (PacketHeader*) green_recvBuf.getBacking(); + auto target = ph->src; + PacketHeader *ph2 = (PacketHeader*)green_sendBuf.getBacking(); + ph2->dst = target; + ph2->src = rank_; + ph2->len = 15; + enQ_send(evQ, green_sendBuf, PacketSize, UINT64_T, target, Tag, GroupWorld); + + return true; + } } diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index 0e9b81b4da..ecc7744930 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -63,12 +63,15 @@ class agileIOconsumer : public EmberMessagePassingGenerator bool first = true; static int memory_bitmask; - Hermes::MemAddr blue_sendBuf = nullptr; + Hermes::MemAddr *blue_sendBuf = nullptr; Hermes::MemAddr *blue_recvBuf = nullptr; Hermes::MemAddr green_sendBuf = nullptr; Hermes::MemAddr green_recvBuf = nullptr; - MessageRequest blue_mesgReq; + MessageRequest *blue_mesgReq = nullptr; MessageResponse green_mesgResp; + MessageRequest green_mesgReq; + bool green_read_first = true; + unsigned blue_round = 0; std::queue* evQ_; uint64_t rank_; From 2d0abd3a360112f21dd5ab4c59f981de7bb6f282 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 8 Sep 2023 16:29:57 +0000 Subject: [PATCH 15/26] add ostream operator --- .../elements/ember/agile-io-consumer/agile-io-consumer.cc | 1 + .../elements/ember/agile-io-consumer/agile-io-consumer.h | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 7c50eece17..fbf04ba327 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -167,6 +167,7 @@ agileIOconsumer::broadcast_and_receive(const long &total_request_size, else { for (int i = 0; i < count; i++) { PacketHeader *ph = (PacketHeader*)blue_recvBuf[i].getBacking(); + std::cerr << ph << "\n"; } return true; } diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index ecc7744930..07f2ced446 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace SST::Ember { @@ -17,6 +18,11 @@ struct PacketHeader { uint64_t src; uint64_t dst; uint64_t len; + + friend std::ostream &operator<<(std::ostream &os, const PacketHeader &ph) { + os << ph.src << ":" << ph.dst << " " << ph.len; + return os; + } }; const unsigned long PacketSize = sizeof(Ember::PacketHeader) / sizeof(uint64_t); From 749a66da0b2b83a18ec9d2a25a2d99f845805c20 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 8 Sep 2023 16:32:38 +0000 Subject: [PATCH 16/26] remove commented code --- .../ember/agile-io-consumer/agile-io-consumer.cc | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index fbf04ba327..ac35c3e77e 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -145,8 +145,6 @@ agileIOconsumer::broadcast_and_receive(const long &total_request_size, if (blue_round == 0) { blue_round++; for (int i = 0; i < count; i++) { - // send( Queue& q, Addr payload, uint32_t count, PayloadDataType dtype, - // RankID dest, uint32_t tag, Communicator group) PacketHeader* send_buffer = (PacketHeader*) blue_sendBuf[i].getBacking(); send_buffer->dst = ionodes[i]; send_buffer->src = rank_; @@ -192,13 +190,8 @@ agileIOconsumer::green_read() if (green_read_first) { green_read_first = false; enQ_irecv(evQ, green_recvBuf, PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &green_mesgReq); - // enQ_recv(evQ, green_recvBuf, PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &green_mesgResp); enQ_wait(evQ, &green_mesgReq, &green_mesgResp); - // enQ_wait(evQ, NULL, &green_mesgResp); - // if (green_mesgResp.status == false) { - // return false; - // } - // enQ_send(evQ, green_sendBuf, count, CHAR, target, 0, GroupWorld); + return false; } else { From f95e6f33730d6b95ea83eadf411199204f3891de Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 8 Sep 2023 16:35:07 +0000 Subject: [PATCH 17/26] remove memory_bitmask --- .../elements/ember/agile-io-consumer/agile-io-consumer.cc | 6 +----- .../elements/ember/agile-io-consumer/agile-io-consumer.h | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index ac35c3e77e..b034f6a82b 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -44,8 +44,6 @@ using namespace SST; using namespace SST::Ember; -int agileIOconsumer::memory_bitmask = 0; - agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMessagePassingGenerator(id, prms, "Null") { rank_ = EmberMessagePassingGenerator::rank(); @@ -82,6 +80,7 @@ agileIOconsumer::generate(std::queue& evQ) evQ_ = &evQ; if (first) { + first = false; // Handle memory allocation memSetBacked(); if (rank_ == 1) { @@ -93,14 +92,11 @@ agileIOconsumer::generate(std::queue& evQ) enQ_memAlloc(evQ, &blue_recvBuf[i], sizeof(Ember::PacketHeader)); enQ_memAlloc(evQ, &blue_sendBuf[i], sizeof(Ember::PacketHeader)); } - memory_bitmask |= (1 << rank_); } if (kind == Green) { enQ_memAlloc(evQ, &green_sendBuf, sizeof(Ember::PacketHeader)); enQ_memAlloc(evQ, &green_recvBuf, sizeof(Ember::PacketHeader)); - memory_bitmask |= (1 << rank_); } - first = false; return false; } diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index 07f2ced446..fd360cc5c5 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -67,7 +67,6 @@ class agileIOconsumer : public EmberMessagePassingGenerator private: unsigned count; bool first = true; - static int memory_bitmask; Hermes::MemAddr *blue_sendBuf = nullptr; Hermes::MemAddr *blue_recvBuf = nullptr; From e16af48be33edb915395f451bf8711a2a0935084 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 8 Sep 2023 16:59:36 +0000 Subject: [PATCH 18/26] remove magicNumber --- src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index fd360cc5c5..ab875e8fec 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -27,8 +27,6 @@ struct PacketHeader { const unsigned long PacketSize = sizeof(Ember::PacketHeader) / sizeof(uint64_t); -const int magicNumber = (1 << 0) + (1 << 4) + (1 << 8) + (1 << 12) + (1 << 1); - class agileIOconsumer : public EmberMessagePassingGenerator { public: From c2d8e9127c97212bcb36cb118a21f32fc7e8bb75 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 8 Sep 2023 17:03:32 +0000 Subject: [PATCH 19/26] fix logic to move the files around --- .../elements/ember/agile-io-consumer/agile-io-consumer.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index b034f6a82b..56e1ff87dc 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -144,7 +144,7 @@ agileIOconsumer::broadcast_and_receive(const long &total_request_size, PacketHeader* send_buffer = (PacketHeader*) blue_sendBuf[i].getBacking(); send_buffer->dst = ionodes[i]; send_buffer->src = rank_; - send_buffer->len = 24; + send_buffer->len = combined_read_size; enQ_send(evQ, blue_sendBuf[i], PacketSize, UINT64_T, ionodes[i], Tag, GroupWorld); } return false; @@ -193,11 +193,13 @@ agileIOconsumer::green_read() else { PacketHeader *ph = (PacketHeader*) green_recvBuf.getBacking(); auto target = ph->src; + auto request_size = ph->len; PacketHeader *ph2 = (PacketHeader*)green_sendBuf.getBacking(); ph2->dst = target; ph2->src = rank_; - ph2->len = 15; + ph2->len = request_size / count; enQ_send(evQ, green_sendBuf, PacketSize, UINT64_T, target, Tag, GroupWorld); + // Now should we send a non-backed buffer of size ph2->len??? return true; } From 5790a886785e87422dc2088d15675ccc49a47243 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 8 Sep 2023 17:16:12 +0000 Subject: [PATCH 20/26] output the PacketHeader not the address --- src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 56e1ff87dc..fe6f389065 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -161,7 +161,7 @@ agileIOconsumer::broadcast_and_receive(const long &total_request_size, else { for (int i = 0; i < count; i++) { PacketHeader *ph = (PacketHeader*)blue_recvBuf[i].getBacking(); - std::cerr << ph << "\n"; + std::cerr << *ph << "\n"; } return true; } From 868d044da9640502d9c6ea10589f5d187f400f00 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 8 Sep 2023 19:24:03 +0000 Subject: [PATCH 21/26] generate should return true on fall-through --- src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index fe6f389065..183d0cbaac 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -103,7 +103,7 @@ agileIOconsumer::generate(std::queue& evQ) if (rank_ == 1) return blue_request(combined_read_size); if (kind == Green) return green_read(); - return false; + return true; } void From 476f2e5b06a67764a2bcb8ef5d84aec8422d20dc Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Fri, 8 Sep 2023 22:31:39 +0000 Subject: [PATCH 22/26] add a comment for what I'm trying to do... --- .../elements/ember/agile-io-consumer/agile-io-consumer.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 183d0cbaac..565ad5dbb1 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -153,12 +153,13 @@ agileIOconsumer::broadcast_and_receive(const long &total_request_size, blue_round++; for (int i = 0; i < count; i++) { PacketHeader *recv_buffer = (PacketHeader*) blue_recvBuf[i].getBacking(); - enQ_irecv(evQ, blue_recvBuf[i], PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &blue_mesgReq[i]); + enQ_irecv(evQ, blue_recvBuf[i], PacketSize, UINT64_T, ionodes[i], Tag, GroupWorld, &blue_mesgReq[i]); enQ_wait(evQ, &blue_mesgReq[i]); } return false; } else { + blue_round++; for (int i = 0; i < count; i++) { PacketHeader *ph = (PacketHeader*)blue_recvBuf[i].getBacking(); std::cerr << *ph << "\n"; @@ -181,7 +182,6 @@ bool agileIOconsumer::green_read() { std::queue &evQ = *evQ_; - // uint64_t target; if (green_read_first) { green_read_first = false; @@ -200,6 +200,8 @@ agileIOconsumer::green_read() ph2->len = request_size / count; enQ_send(evQ, green_sendBuf, PacketSize, UINT64_T, target, Tag, GroupWorld); // Now should we send a non-backed buffer of size ph2->len??? + // The line below causes the sim not to terminate + // enQ_send(evQ, green_fileBuf, ph2->len, CHAR, target, Tag, GroupWorld); return true; } From 271268f5f8fe9fa253a28f4209cd82a065e386af Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Sat, 9 Sep 2023 18:26:18 +0000 Subject: [PATCH 23/26] weird results on experiment with packet size --- src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index ab875e8fec..cf756eb25f 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -18,6 +18,8 @@ struct PacketHeader { uint64_t src; uint64_t dst; uint64_t len; + // 116 works. anything larger hangs + uint64_t data[116]; friend std::ostream &operator<<(std::ostream &os, const PacketHeader &ph) { os << ph.src << ":" << ph.dst << " " << ph.len; From a5972326a394bc8f63a793db281769d73891f775 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Mon, 11 Sep 2023 21:16:05 +0000 Subject: [PATCH 24/26] remove unused methods --- .../agile-io-consumer/agile-io-consumer.cc | 17 ----------------- .../ember/agile-io-consumer/agile-io-consumer.h | 5 +---- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc index 565ad5dbb1..c578302b18 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc @@ -106,23 +106,6 @@ agileIOconsumer::generate(std::queue& evQ) return true; } -void -agileIOconsumer::read_request() -{ -} - -int -agileIOconsumer::write_data() -{ - return 0; -} - -int -agileIOconsumer::num_io_nodes() -{ - return 0; -} - // Sent to all the IO nodes void agileIOconsumer::validate(const long total_request_size) { long request = total_request_size / count; diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index cf756eb25f..29f57bed9e 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -52,9 +52,6 @@ class agileIOconsumer : public EmberMessagePassingGenerator void setup() override; bool generate(std::queue& evQ) override; - void read_request(); - int write_data(); - int num_io_nodes(); // Sent to all the IO nodes void validate(const long total_request_size); @@ -78,7 +75,7 @@ class agileIOconsumer : public EmberMessagePassingGenerator bool green_read_first = true; unsigned blue_round = 0; - std::queue* evQ_; + std::queue* evQ_ = nullptr; uint64_t rank_; std::vector ionodes; // Whether the node is IO (green) or consumer (blue) From efa2b4e280902c23bd808bbd408617b2b4ec525c Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Thu, 19 Oct 2023 20:13:03 +0000 Subject: [PATCH 25/26] increase buffer sizes on ReorderLinkControl --- src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py b/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py index 3ed756f4aa..680c5a5041 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py +++ b/src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py @@ -35,8 +35,8 @@ def example(): ### set up the endpoint networkif = ReorderLinkControl() networkif.link_bw = "4GB/s" - networkif.input_buf_size = "1kB" - networkif.output_buf_size = "1kB" + networkif.input_buf_size = "16kB" + networkif.output_buf_size = "16kB" ep = EmberMPIJob(0,topo.getNumNodes()) ep.network_interface = networkif From 356f98f1e24094eeca5dec719931d0126e661280 Mon Sep 17 00:00:00 2001 From: "Justin M. LaPre" Date: Thu, 19 Oct 2023 20:17:34 +0000 Subject: [PATCH 26/26] increase size of data to 1020 longs --- src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h index 29f57bed9e..7395e6d236 100644 --- a/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h +++ b/src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h @@ -18,8 +18,7 @@ struct PacketHeader { uint64_t src; uint64_t dst; uint64_t len; - // 116 works. anything larger hangs - uint64_t data[116]; + uint64_t data[1020]; friend std::ostream &operator<<(std::ostream &os, const PacketHeader &ph) { os << ph.src << ":" << ph.dst << " " << ph.len;