Skip to content

Commit

Permalink
Add experimental fair::mq::Msg & Socket API
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed Nov 20, 2020
1 parent 99f2466 commit 940826c
Show file tree
Hide file tree
Showing 16 changed files with 872 additions and 133 deletions.
8 changes: 8 additions & 0 deletions fairmq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ if(BUILD_FAIRMQ)
FairMQTransportFactory.h
MemoryResources.h
MemoryResourceTools.h
Msg.h
Transports.h
options/FairMQProgOptions.h
JSONParser.h
Expand Down Expand Up @@ -244,6 +245,7 @@ if(BUILD_FAIRMQ)
###################
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startExperimentalMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startExperimentalMQBenchmark.sh)

#################################
# define libFairMQ build target #
Expand Down Expand Up @@ -350,6 +352,9 @@ if(BUILD_FAIRMQ)
add_executable(fairmq-bsampler run/runBenchmarkSampler.cxx)
target_link_libraries(fairmq-bsampler FairMQ)

add_executable(fairmq-exp-bsampler run/runExperimentalBenchmarkSampler.cxx)
target_link_libraries(fairmq-exp-bsampler FairMQ)

add_executable(fairmq-merger run/runMerger.cxx)
target_link_libraries(fairmq-merger FairMQ)

Expand All @@ -362,6 +367,9 @@ if(BUILD_FAIRMQ)
add_executable(fairmq-sink run/runSink.cxx)
target_link_libraries(fairmq-sink FairMQ)

add_executable(fairmq-exp-sink run/runExperimentalSink.cxx)
target_link_libraries(fairmq-exp-sink FairMQ)

add_executable(fairmq-splitter run/runSplitter.cxx)
target_link_libraries(fairmq-splitter FairMQ)

Expand Down
2 changes: 1 addition & 1 deletion fairmq/FairMQChannel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin
, fPortRangeMax(DefaultPortRangeMax)
, fAutoBind(DefaultAutoBind)
, fValid(false)
, fMultipart(false)
, fMultipart(0)
{
// LOG(warn) << "Constructing channel '" << fName << "'";
}
Expand Down
31 changes: 30 additions & 1 deletion fairmq/FairMQChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <FairMQTransportFactory.h>
#include <FairMQUnmanagedRegion.h>
#include <FairMQSocket.h>
#include <fairmq/Msg.h>
#include <fairmq/Transports.h>
#include <FairMQParts.h>
#include <fairmq/Properties.h>
Expand Down Expand Up @@ -311,6 +312,16 @@ class FairMQChannel
return Receive(parts.fParts, rcvTimeoutInMs);
}

fair::mq::TransferResult Send(fair::mq::Msg msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg.fBuffers);
return fSocket->Send(std::move(msg), sndTimeoutInMs);
}
fair::mq::TransferResult Receive(int rcvTimeoutInMs = -1)
{
return fSocket->Receive(rcvTimeoutInMs);
}

unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); }
unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); }
unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); }
Expand All @@ -324,18 +335,36 @@ class FairMQChannel
return Transport()->CreateMessage(std::forward<Args>(args)...);
}

template<typename... Args>
FairMQMessagePtr NewBuffer(Args&&... args)
{
return Transport()->CreateMessage(std::forward<Args>(args)...);
}

template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data)
{
return Transport()->NewSimpleMessage(data);
}

template<typename T>
FairMQMessagePtr NewSimpleBuffer(const T& data)
{
return Transport()->NewSimpleMessage(data);
}

template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data)
{
return Transport()->NewStaticMessage(data);
}

template<typename T>
FairMQMessagePtr NewStaticBuffer(const T& data)
{
return Transport()->NewStaticMessage(data);
}

template<typename... Args>
FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args)
{
Expand Down Expand Up @@ -379,7 +408,7 @@ class FairMQChannel

bool fValid;

bool fMultipart;
short fMultipart;

void CheckSendCompatibility(FairMQMessagePtr& msg)
{
Expand Down
190 changes: 84 additions & 106 deletions fairmq/FairMQDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version)
, fUninitializedBindingChannels()
, fUninitializedConnectingChannels()
, fDataCallbacks(false)
, fMessageInputs()
, fMsgInputs()
, fMultipartInputs()
, fMultitransportInputs()
Expand Down Expand Up @@ -504,95 +505,82 @@ void FairMQDevice::HandleSingleChannelInput()
{
bool proceed = true;

if (fMsgInputs.size() > 0)
{
while (!NewStatePending() && proceed)
{
proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0);
if (!fMessageInputs.empty()) {
while (!NewStatePending() && proceed) {
proceed = HandleMessageInput(fInputChannelKeys.at(0), fMessageInputs.begin()->second, 0);
}
}
else if (fMultipartInputs.size() > 0)
{
while (!NewStatePending() && proceed)
{
} else if (!fMultipartInputs.empty()) {
while (!NewStatePending() && proceed) {
proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0);
}
} else if (!fMsgInputs.empty()) {
while (!NewStatePending() && proceed) {
proceed = HandleInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0);
}
}
}

void FairMQDevice::HandleMultipleChannelInput()
{
// check if more than one transport is used
fMultitransportInputs.clear();
for (const auto& k : fInputChannelKeys)
{
for (const auto& k : fInputChannelKeys) {
fair::mq::Transport t = fChannels.at(k).at(0).fTransportType;
if (fMultitransportInputs.find(t) == fMultitransportInputs.end())
{
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
fMultitransportInputs.insert(pair<fair::mq::Transport, vector<string>>(t, vector<string>()));
fMultitransportInputs.at(t).push_back(k);
}
else
{
} else {
fMultitransportInputs.at(t).push_back(k);
}
}

for (const auto& mi : fMsgInputs)
{
for (auto& i : fChannels.at(mi.first))
{
i.fMultipart = false;
for (const auto& mi : fMessageInputs) {
for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = 0;
}
}

for (const auto& mi : fMultipartInputs)
{
for (auto& i : fChannels.at(mi.first))
{
i.fMultipart = true;
for (const auto& mi : fMultipartInputs) {
for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = 1;
}
}

for (const auto& mi : fMsgInputs) {
for (auto& i : fChannels.at(mi.first)) {
i.fMultipart = 2;
}
}

// if more than one transport is used, handle poll of each in a separate thread
if (fMultitransportInputs.size() > 1)
{
if (fMultitransportInputs.size() > 1) {
HandleMultipleTransportInput();
}
else // otherwise poll directly
{
} else { // otherwise poll directly
bool proceed = true;

FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));

while (!NewStatePending() && proceed)
{
while (!NewStatePending() && proceed) {
poller->Poll(200);

// check which inputs are ready and call their data handlers if they are.
for (const auto& ch : fInputChannelKeys)
{
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
{
if (poller->CheckInput(ch, i))
{
if (fChannels.at(ch).at(i).fMultipart)
{
for (const auto& ch : fInputChannelKeys) {
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) {
if (fChannels.at(ch).at(i).fMultipart == 1) {
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
}
else
{
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
} else if (fChannels.at(ch).at(i).fMultipart == 0) {
proceed = HandleMessageInput(ch, fMessageInputs.at(ch), i);
} else if (fChannels.at(ch).at(i).fMultipart == 2) {
proceed = HandleInput(ch, fMsgInputs.at(ch), i);
}

if (!proceed)
{
if (!proceed) {
break;
}
}
}
if (!proceed)
{
if (!proceed) {
break;
}
}
Expand All @@ -606,79 +594,72 @@ void FairMQDevice::HandleMultipleTransportInput()

fMultitransportProceed = true;

for (const auto& i : fMultitransportInputs)
{
for (const auto& i : fMultitransportInputs) {
threads.emplace_back(thread(&FairMQDevice::PollForTransport, this, fTransports.at(i.first).get(), i.second));
}

for (thread& t : threads)
{
for (thread& t : threads) {
t.join();
}
}

void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const vector<string>& channelKeys)
{
try
{
FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys));

while (!NewStatePending() && fMultitransportProceed)
{
poller->Poll(500);

for (const auto& ch : channelKeys)
{
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
{
if (poller->CheckInput(ch, i))
{
lock_guard<mutex> lock(fMultitransportMutex);

if (!fMultitransportProceed)
{
break;
}
try {
FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys));

if (fChannels.at(ch).at(i).fMultipart)
{
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
}
else
{
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
}
while (!NewStatePending() && fMultitransportProceed) {
poller->Poll(500);

if (!fMultitransportProceed)
{
break;
}
for (const auto& ch : channelKeys) {
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
if (poller->CheckInput(ch, i)) {
lock_guard<mutex> lock(fMultitransportMutex);

if (!fMultitransportProceed) {
break;
}

if (fChannels.at(ch).at(i).fMultipart == 1) {
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
} else if (fChannels.at(ch).at(i).fMultipart == 0) {
fMultitransportProceed = HandleMessageInput(ch, fMessageInputs.at(ch), i);
} else if (fChannels.at(ch).at(i).fMultipart == 2) {
fMultitransportProceed = HandleInput(ch, fMsgInputs.at(ch), i);
}

if (!fMultitransportProceed) {
break;
}
}
if (!fMultitransportProceed)
{
break;
}
}
if (!fMultitransportProceed) {
break;
}
}
}
catch (exception& e)
{
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
} catch (exception& e) {
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
}

bool FairMQDevice::HandleInput(const string& chName, const InputCallback& callback, int i)
{
TransferResult result = Receive(chName, i);

if (result.code == TransferCode::success) {
return callback(std::move(result.msg.value()), i);
} else {
return false;
}
}

bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i)
bool FairMQDevice::HandleMessageInput(const string& chName, const InputMessageCallback& callback, int i)
{
unique_ptr<FairMQMessage> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());

if (Receive(input, chName, i) >= 0)
{
if (Receive(input, chName, i) >= 0) {
return callback(input, i);
}
else
{
} else {
return false;
}
}
Expand All @@ -687,12 +668,9 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa
{
FairMQParts input;

if (Receive(input, chName, i) >= 0)
{
if (Receive(input, chName, i) >= 0) {
return callback(input, 0);
}
else
{
} else {
return false;
}
}
Expand Down
Loading

0 comments on commit 940826c

Please sign in to comment.