Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AGILE IO consumer #2251

Draft
wants to merge 26 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sst/elements/ember/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ libember_la_SOURCES = \
embermotiflog.h \
embermotiflog.cc \
embermemoryev.h \
agile-io-consumer/agile-io-consumer.h \
agile-io-consumer/agile-io-consumer.cc \
libs/emberLib.h \
libs/misc.h \
libs/miscEvents/emberGetNodeNumEvent.h \
Expand Down
191 changes: 191 additions & 0 deletions src/sst/elements/ember/agile-io-consumer/agile-io-consumer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright 2009-2023 NTESS. Under the terms
// of Contract DE-NA0003525 with NTESS, the U.S.
// Government retains certain rights in this software.
//
// Copyright (c) 2009-2023, NTESS
// All rights reserved.
//
// Portions are copyright of other developers:
// See the file CONTRIBUTORS.TXT in the top level directory
// of the distribution for more information.
//
// This file is part of the SST software package. For license
// information, see the LICENSE file in the top level directory of the
// distribution.


#include <array>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <queue>
#include <sst_config.h>
#include "agile-io-consumer.h"
#include "emberevent.h"
#include "event.h"
#include "mpi/embermpigen.h"
#include "params.h"
#include "sst/elements/hermes/hermes.h"
#include "sst/elements/hermes/msgapi.h"

#include <iostream>
#include <fstream>
#include <algorithm>
#include <cmath>
#include <cstring>
#include <climits>
#include <sstream>

#ifndef MPI_ANY_SOURCE
#define MPI_ANY_SOURCE -1
#endif

using namespace SST;
using namespace SST::Ember;

agileIOconsumer::agileIOconsumer(SST::ComponentId_t id, Params& prms) : EmberMessagePassingGenerator(id, prms, "Null")
{
rank_ = EmberMessagePassingGenerator::rank();

if (rank_ == 0) {
std::cout << "Running agileIOconsumer motif\n";
}

prms.find_array<int>("arg.IONodes", ionodes);

count = ionodes.size();

long buffer_size = combined_read_size / count;
}

void agileIOconsumer::setup()
{
rank_ = EmberMessagePassingGenerator::rank();

auto it = find(ionodes.begin(), ionodes.end(), rank_);
if (it == ionodes.end()) {
kind = Blue;
std::cerr << "Blue\n";
}
else {
kind = Green;
std::cerr << "Green\n";
}
}

bool
agileIOconsumer::generate(std::queue<EmberEvent*>& evQ)
{
evQ_ = &evQ;

if (first) {
first = false;
// Handle memory allocation
memSetBacked();
if (rank_ == 1) {
// Rank 1 is Blue
blue_mesgReq = new MessageRequest[count];
blue_recvBuf = new Hermes::MemAddr[count];
blue_sendBuf = new Hermes::MemAddr[count];
for (int i = 0; i < count; i++) {
enQ_memAlloc(evQ, &blue_recvBuf[i], sizeof(Ember::PacketHeader));
enQ_memAlloc(evQ, &blue_sendBuf[i], sizeof(Ember::PacketHeader));
}
}
if (kind == Green) {
enQ_memAlloc(evQ, &green_sendBuf, sizeof(Ember::PacketHeader));
enQ_memAlloc(evQ, &green_recvBuf, sizeof(Ember::PacketHeader));
}

return false;
}

if (rank_ == 1) return blue_request(combined_read_size);
if (kind == Green) return green_read();
return true;
}

// Sent to all the IO nodes
void agileIOconsumer::validate(const long total_request_size) {
long request = total_request_size / count;
if (request * count != total_request_size) {
std::cerr << request << " * " << count << " != " << total_request_size
<< "\n";
abort();
}
}

// send each of the IO nodes a request for total_request_size
// it will be up to them to return total_request_size / num_io_nodes
bool
agileIOconsumer::broadcast_and_receive(const long &total_request_size,
std::queue<EmberEvent *> &evQ) {
if (blue_round == 0) {
blue_round++;
for (int i = 0; i < count; i++) {
PacketHeader* send_buffer = (PacketHeader*) blue_sendBuf[i].getBacking();
send_buffer->dst = ionodes[i];
send_buffer->src = rank_;
send_buffer->len = combined_read_size;
enQ_send(evQ, blue_sendBuf[i], PacketSize, UINT64_T, ionodes[i], Tag, GroupWorld);
}
return false;
}
else if (blue_round == 1) {
blue_round++;
for (int i = 0; i < count; i++) {
PacketHeader *recv_buffer = (PacketHeader*) blue_recvBuf[i].getBacking();
enQ_irecv(evQ, blue_recvBuf[i], PacketSize, UINT64_T, ionodes[i], Tag, GroupWorld, &blue_mesgReq[i]);
enQ_wait(evQ, &blue_mesgReq[i]);
}
return false;
}
else {
blue_round++;
for (int i = 0; i < count; i++) {
PacketHeader *ph = (PacketHeader*)blue_recvBuf[i].getBacking();
std::cerr << *ph << "\n";
}
return true;
}
}

bool
agileIOconsumer::blue_request(const long total_request_size) {
std::queue<EmberEvent *> &evQ = *evQ_;

validate(total_request_size);

return broadcast_and_receive(total_request_size, evQ);
}

// Each IO node responds with amount of data read, possibly less than requested
bool
agileIOconsumer::green_read()
{
std::queue<EmberEvent *> &evQ = *evQ_;

if (green_read_first) {
green_read_first = false;
enQ_irecv(evQ, green_recvBuf, PacketSize, UINT64_T, AnySrc, Tag, GroupWorld, &green_mesgReq);
enQ_wait(evQ, &green_mesgReq, &green_mesgResp);

return false;
}
else {
PacketHeader *ph = (PacketHeader*) green_recvBuf.getBacking();
auto target = ph->src;
auto request_size = ph->len;
PacketHeader *ph2 = (PacketHeader*)green_sendBuf.getBacking();
ph2->dst = target;
ph2->src = rank_;
ph2->len = request_size / count;
enQ_send(evQ, green_sendBuf, PacketSize, UINT64_T, target, Tag, GroupWorld);
// Now should we send a non-backed buffer of size ph2->len???
// The line below causes the sim not to terminate
// enQ_send(evQ, green_fileBuf, ph2->len, CHAR, target, Tag, GroupWorld);

return true;
}
}
87 changes: 87 additions & 0 deletions src/sst/elements/ember/agile-io-consumer/agile-io-consumer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#ifndef AGILE_IO_CONSUMER_H_
#define AGILE_IO_CONSUMER_H_

#include "mpi/embermpigen.h"
#include "sst/elements/hermes/hermes.h"
#include "sst/elements/hermes/msgapi.h"
#include <codecvt>
#include <cstddef>
#include <cstdint>
#include <ostream>

namespace SST::Ember {

const long combined_read_size = 10*1024*1024;
const long Tag = 37;

struct PacketHeader {
uint64_t src;
uint64_t dst;
uint64_t len;
uint64_t data[1020];

friend std::ostream &operator<<(std::ostream &os, const PacketHeader &ph) {
os << ph.src << ":" << ph.dst << " " << ph.len;
return os;
}
};

const unsigned long PacketSize = sizeof(Ember::PacketHeader) / sizeof(uint64_t);

class agileIOconsumer : public EmberMessagePassingGenerator
{
public:
SST_ELI_REGISTER_SUBCOMPONENT(
agileIOconsumer,
"ember",
"IOConsumerMotif",
SST_ELI_ELEMENT_VERSION(1,0,0),
"Performs a IOConsumer Motif",
SST::Ember::EmberGenerator
)


SST_ELI_DOCUMENT_PARAMS(
{"arg.IONodes", "Array of IO nodes", "0"}
)

agileIOconsumer(SST::ComponentId_t id, Params& prms);
~agileIOconsumer() override = default;

void setup() override;

bool generate(std::queue<EmberEvent*>& evQ) override;

// Sent to all the IO nodes
void validate(const long total_request_size);
bool broadcast_and_receive(const long &total_request_size, std::queue<EmberEvent *> &evQ);
bool blue_request(long total_request_size);

// Each IO node responds with amount of data read
bool green_read();

private:
unsigned count;
bool first = true;

Hermes::MemAddr *blue_sendBuf = nullptr;
Hermes::MemAddr *blue_recvBuf = nullptr;
Hermes::MemAddr green_sendBuf = nullptr;
Hermes::MemAddr green_recvBuf = nullptr;
MessageRequest *blue_mesgReq = nullptr;
MessageResponse green_mesgResp;
MessageRequest green_mesgReq;
bool green_read_first = true;
unsigned blue_round = 0;

std::queue<EmberEvent*>* evQ_ = nullptr;
uint64_t rank_;
std::vector<int> ionodes;
// Whether the node is IO (green) or consumer (blue)
enum Kind { Green, Blue };
Kind kind;
};

}

#endif /* AGILE_IO_CONSUMER_H_ */
55 changes: 55 additions & 0 deletions src/sst/elements/ember/agile-io-consumer/agile_io_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import sst
from sst.merlin.base import *
from sst.merlin.endpoint import *
from sst.merlin.interface import *
from sst.merlin.topology import *

from sst.ember import *

def example():
PlatformDefinition.setCurrentPlatform("firefly-defaults")

### Setup the topology
topo = topoDragonFly()
topo.hosts_per_router = 2
topo.routers_per_group = 4
topo.intergroup_links = 2
topo.num_groups = 4
topo.algorithm = ["minimal","adaptive-local"]

# Set up the routers
router = hr_router()
router.link_bw = "4GB/s"
router.flit_size = "8B"
router.xbar_bw = "6GB/s"
router.input_latency = "20ns"
router.output_latency = "20ns"
router.input_buf_size = "4kB"
router.output_buf_size = "4kB"
router.num_vns = 2
router.xbar_arb = "merlin.xbar_arb_lru"

topo.router = router
topo.link_latency = "20ns"

### set up the endpoint
networkif = ReorderLinkControl()
networkif.link_bw = "4GB/s"
networkif.input_buf_size = "16kB"
networkif.output_buf_size = "16kB"

ep = EmberMPIJob(0,topo.getNumNodes())
ep.network_interface = networkif
ep.addMotif("Init")
ep.addMotif("IOConsumer IONodes=[0,4,8,12]")
ep.addMotif("Fini")
ep.nic.nic2host_lat= "100ns"

system = System()
system.setTopology(topo)
system.allocateNodes(ep,"linear")

system.build()

if __name__ == "__main__":
example()