From 940826c14b9dab1c993d75a156c4522273b93bdf Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 21 Oct 2020 09:15:55 +0200 Subject: [PATCH] Add experimental fair::mq::Msg & Socket API --- fairmq/CMakeLists.txt | 8 + fairmq/FairMQChannel.cxx | 2 +- fairmq/FairMQChannel.h | 31 ++- fairmq/FairMQDevice.cxx | 190 ++++++++---------- fairmq/FairMQDevice.h | 79 ++++++-- fairmq/FairMQParts.h | 1 - fairmq/FairMQSocket.h | 16 +- fairmq/Msg.h | 90 +++++++++ fairmq/devices/ExperimentalBenchmarkSampler.h | 110 ++++++++++ fairmq/devices/ExperimentalSink.h | 74 +++++++ fairmq/devices/FairMQBenchmarkSampler.h | 4 - .../run/runExperimentalBenchmarkSampler.cxx | 29 +++ fairmq/run/runExperimentalSink.cxx | 24 +++ fairmq/run/startExperimentalMQBenchmark.sh.in | 86 ++++++++ fairmq/shmem/Socket.h | 139 ++++++++++++- fairmq/zeromq/Socket.h | 122 +++++++++++ 16 files changed, 872 insertions(+), 133 deletions(-) create mode 100644 fairmq/Msg.h create mode 100644 fairmq/devices/ExperimentalBenchmarkSampler.h create mode 100644 fairmq/devices/ExperimentalSink.h create mode 100644 fairmq/run/runExperimentalBenchmarkSampler.cxx create mode 100644 fairmq/run/runExperimentalSink.cxx create mode 100755 fairmq/run/startExperimentalMQBenchmark.sh.in diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index cec9f3142..b049a4f0d 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -158,6 +158,7 @@ if(BUILD_FAIRMQ) FairMQTransportFactory.h MemoryResources.h MemoryResourceTools.h + Msg.h Transports.h options/FairMQProgOptions.h JSONParser.h @@ -244,6 +245,7 @@ if(BUILD_FAIRMQ) ################### set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh) + configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startExperimentalMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startExperimentalMQBenchmark.sh) ################################# # define libFairMQ build target # @@ -350,6 +352,9 @@ if(BUILD_FAIRMQ) add_executable(fairmq-bsampler run/runBenchmarkSampler.cxx) target_link_libraries(fairmq-bsampler FairMQ) + add_executable(fairmq-exp-bsampler run/runExperimentalBenchmarkSampler.cxx) + target_link_libraries(fairmq-exp-bsampler FairMQ) + add_executable(fairmq-merger run/runMerger.cxx) target_link_libraries(fairmq-merger FairMQ) @@ -362,6 +367,9 @@ if(BUILD_FAIRMQ) add_executable(fairmq-sink run/runSink.cxx) target_link_libraries(fairmq-sink FairMQ) + add_executable(fairmq-exp-sink run/runExperimentalSink.cxx) + target_link_libraries(fairmq-exp-sink FairMQ) + add_executable(fairmq-splitter run/runSplitter.cxx) target_link_libraries(fairmq-splitter FairMQ) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 71278edf7..1518af13e 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -82,7 +82,7 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin , fPortRangeMax(DefaultPortRangeMax) , fAutoBind(DefaultAutoBind) , fValid(false) - , fMultipart(false) + , fMultipart(0) { // LOG(warn) << "Constructing channel '" << fName << "'"; } diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index f4405b73f..2365c40e5 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -311,6 +312,16 @@ class FairMQChannel return Receive(parts.fParts, rcvTimeoutInMs); } + fair::mq::TransferResult Send(fair::mq::Msg msg, int sndTimeoutInMs = -1) + { + CheckSendCompatibility(msg.fBuffers); + return fSocket->Send(std::move(msg), sndTimeoutInMs); + } + fair::mq::TransferResult Receive(int rcvTimeoutInMs = -1) + { + return fSocket->Receive(rcvTimeoutInMs); + } + unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); } unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); } unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); } @@ -324,18 +335,36 @@ class FairMQChannel return Transport()->CreateMessage(std::forward(args)...); } + template + FairMQMessagePtr NewBuffer(Args&&... args) + { + return Transport()->CreateMessage(std::forward(args)...); + } + template FairMQMessagePtr NewSimpleMessage(const T& data) { return Transport()->NewSimpleMessage(data); } + template + FairMQMessagePtr NewSimpleBuffer(const T& data) + { + return Transport()->NewSimpleMessage(data); + } + template FairMQMessagePtr NewStaticMessage(const T& data) { return Transport()->NewStaticMessage(data); } + template + FairMQMessagePtr NewStaticBuffer(const T& data) + { + return Transport()->NewStaticMessage(data); + } + template FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args) { @@ -379,7 +408,7 @@ class FairMQChannel bool fValid; - bool fMultipart; + short fMultipart; void CheckSendCompatibility(FairMQMessagePtr& msg) { diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 7ee555e36..feacd299b 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -83,6 +83,7 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version) , fUninitializedBindingChannels() , fUninitializedConnectingChannels() , fDataCallbacks(false) + , fMessageInputs() , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() @@ -504,19 +505,18 @@ void FairMQDevice::HandleSingleChannelInput() { bool proceed = true; - if (fMsgInputs.size() > 0) - { - while (!NewStatePending() && proceed) - { - proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0); + if (!fMessageInputs.empty()) { + while (!NewStatePending() && proceed) { + proceed = HandleMessageInput(fInputChannelKeys.at(0), fMessageInputs.begin()->second, 0); } - } - else if (fMultipartInputs.size() > 0) - { - while (!NewStatePending() && proceed) - { + } else if (!fMultipartInputs.empty()) { + while (!NewStatePending() && proceed) { proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0); } + } else if (!fMsgInputs.empty()) { + while (!NewStatePending() && proceed) { + proceed = HandleInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0); + } } } @@ -524,75 +524,63 @@ void FairMQDevice::HandleMultipleChannelInput() { // check if more than one transport is used fMultitransportInputs.clear(); - for (const auto& k : fInputChannelKeys) - { + for (const auto& k : fInputChannelKeys) { fair::mq::Transport t = fChannels.at(k).at(0).fTransportType; - if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) - { + if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) { fMultitransportInputs.insert(pair>(t, vector())); fMultitransportInputs.at(t).push_back(k); - } - else - { + } else { fMultitransportInputs.at(t).push_back(k); } } - for (const auto& mi : fMsgInputs) - { - for (auto& i : fChannels.at(mi.first)) - { - i.fMultipart = false; + for (const auto& mi : fMessageInputs) { + for (auto& i : fChannels.at(mi.first)) { + i.fMultipart = 0; } } - for (const auto& mi : fMultipartInputs) - { - for (auto& i : fChannels.at(mi.first)) - { - i.fMultipart = true; + for (const auto& mi : fMultipartInputs) { + for (auto& i : fChannels.at(mi.first)) { + i.fMultipart = 1; + } + } + + for (const auto& mi : fMsgInputs) { + for (auto& i : fChannels.at(mi.first)) { + i.fMultipart = 2; } } // if more than one transport is used, handle poll of each in a separate thread - if (fMultitransportInputs.size() > 1) - { + if (fMultitransportInputs.size() > 1) { HandleMultipleTransportInput(); - } - else // otherwise poll directly - { + } else { // otherwise poll directly bool proceed = true; FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys)); - while (!NewStatePending() && proceed) - { + while (!NewStatePending() && proceed) { poller->Poll(200); // check which inputs are ready and call their data handlers if they are. - for (const auto& ch : fInputChannelKeys) - { - for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) - { - if (poller->CheckInput(ch, i)) - { - if (fChannels.at(ch).at(i).fMultipart) - { + for (const auto& ch : fInputChannelKeys) { + for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) { + if (poller->CheckInput(ch, i)) { + if (fChannels.at(ch).at(i).fMultipart == 1) { proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i); - } - else - { - proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i); + } else if (fChannels.at(ch).at(i).fMultipart == 0) { + proceed = HandleMessageInput(ch, fMessageInputs.at(ch), i); + } else if (fChannels.at(ch).at(i).fMultipart == 2) { + proceed = HandleInput(ch, fMsgInputs.at(ch), i); } - if (!proceed) - { + if (!proceed) { break; } } } - if (!proceed) - { + if (!proceed) { break; } } @@ -606,79 +594,72 @@ void FairMQDevice::HandleMultipleTransportInput() fMultitransportProceed = true; - for (const auto& i : fMultitransportInputs) - { + for (const auto& i : fMultitransportInputs) { threads.emplace_back(thread(&FairMQDevice::PollForTransport, this, fTransports.at(i.first).get(), i.second)); } - for (thread& t : threads) - { + for (thread& t : threads) { t.join(); } } void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const vector& channelKeys) -{ - try - { - FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys)); - - while (!NewStatePending() && fMultitransportProceed) - { - poller->Poll(500); - - for (const auto& ch : channelKeys) - { - for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) - { - if (poller->CheckInput(ch, i)) - { - lock_guard lock(fMultitransportMutex); - - if (!fMultitransportProceed) - { - break; - } +try { + FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys)); - if (fChannels.at(ch).at(i).fMultipart) - { - fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i); - } - else - { - fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i); - } + while (!NewStatePending() && fMultitransportProceed) { + poller->Poll(500); - if (!fMultitransportProceed) - { - break; - } + for (const auto& ch : channelKeys) { + for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) { + if (poller->CheckInput(ch, i)) { + lock_guard lock(fMultitransportMutex); + + if (!fMultitransportProceed) { + break; + } + + if (fChannels.at(ch).at(i).fMultipart == 1) { + fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i); + } else if (fChannels.at(ch).at(i).fMultipart == 0) { + fMultitransportProceed = HandleMessageInput(ch, fMessageInputs.at(ch), i); + } else if (fChannels.at(ch).at(i).fMultipart == 2) { + fMultitransportProceed = HandleInput(ch, fMsgInputs.at(ch), i); + } + + if (!fMultitransportProceed) { + break; } } - if (!fMultitransportProceed) - { - break; - } + } + if (!fMultitransportProceed) { + break; } } } - catch (exception& e) - { - LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state."; - throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state.")); +} catch (exception& e) { + LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state."; + throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state.")); +} + +bool FairMQDevice::HandleInput(const string& chName, const InputCallback& callback, int i) +{ + TransferResult result = Receive(chName, i); + + if (result.code == TransferCode::success) { + return callback(std::move(result.msg.value()), i); + } else { + return false; } } -bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i) +bool FairMQDevice::HandleMessageInput(const string& chName, const InputMessageCallback& callback, int i) { unique_ptr input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage()); - if (Receive(input, chName, i) >= 0) - { + if (Receive(input, chName, i) >= 0) { return callback(input, i); - } - else - { + } else { return false; } } @@ -687,12 +668,9 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa { FairMQParts input; - if (Receive(input, chName, i) >= 0) - { + if (Receive(input, chName, i) >= 0) { return callback(input, 0); - } - else - { + } else { return false; } } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 92ebb6faa..09823a47b 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -38,8 +39,9 @@ using FairMQChannelMap = std::unordered_map>; -using InputMsgCallback = std::function; +using InputMessageCallback = std::function; using InputMultipartCallback = std::function; +using InputCallback = std::function; namespace fair { @@ -135,6 +137,15 @@ class FairMQDevice return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); } + fair::mq::TransferResult Send(fair::mq::Msg msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) + { + return GetChannel(channel, index).Send(std::move(msg), sndTimeoutInMs); + } + fair::mq::TransferResult Receive(const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) + { + return GetChannel(channel, index).Receive(rcvTimeoutInMs); + } + /// @brief Getter for default transport factory auto Transport() const -> FairMQTransportFactory* { @@ -155,6 +166,20 @@ class FairMQDevice return GetChannel(channel, index).NewMessage(std::forward(args)...); } + // creates buffer with the default device transport + template + FairMQMessagePtr NewBuffer(Args&&... args) + { + return Transport()->CreateMessage(std::forward(args)...); + } + + // creates buffer with the transport of the specified channel + template + FairMQMessagePtr NewBufferFor(const std::string& channel, int index, Args&&... args) + { + return GetChannel(channel, index).NewBuffer(std::forward(args)...); + } + // creates a message that will not be cleaned up after transfer, with the default device transport template FairMQMessagePtr NewStaticMessage(const T& data) @@ -257,24 +282,21 @@ class FairMQDevice void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index)) { fDataCallbacks = true; - fMsgInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQMessagePtr& msg, int index) - { + fMessageInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQMessagePtr& msg, int index) { return (static_cast(this)->*memberFunction)(msg, index); })); - if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) - { + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) { fInputChannelKeys.push_back(channelName); } } - void OnData(const std::string& channelName, InputMsgCallback callback) + void OnData(const std::string& channelName, InputMessageCallback callback) { fDataCallbacks = true; - fMsgInputs.insert(make_pair(channelName, callback)); + fMessageInputs.insert(make_pair(channelName, callback)); - if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) - { + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) { fInputChannelKeys.push_back(channelName); } } @@ -284,13 +306,11 @@ class FairMQDevice void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index)) { fDataCallbacks = true; - fMultipartInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQParts& parts, int index) - { + fMultipartInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQParts& parts, int index) { return (static_cast(this)->*memberFunction)(parts, index); })); - if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) - { + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) { fInputChannelKeys.push_back(channelName); } } @@ -300,8 +320,31 @@ class FairMQDevice fDataCallbacks = true; fMultipartInputs.insert(make_pair(channelName, callback)); - if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) - { + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) { + fInputChannelKeys.push_back(channelName); + } + } + + // overload to easily bind member functions + template + void OnData(const std::string& channelName, bool (T::* memberFunction)(fair::mq::Msg msg, int index)) + { + fDataCallbacks = true; + fMsgInputs.insert(std::make_pair(channelName, [this, memberFunction](fair::mq::Msg msg, int index) { + return (static_cast(this)->*memberFunction)(std::move(msg), index); + })); + + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) { + fInputChannelKeys.push_back(channelName); + } + } + + void OnData(const std::string& channelName, InputCallback callback) + { + fDataCallbacks = true; + fMsgInputs.insert(make_pair(channelName, callback)); + + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) { fInputChannelKeys.push_back(channelName); } } @@ -492,14 +535,16 @@ class FairMQDevice void HandleMultipleTransportInput(); void PollForTransport(const FairMQTransportFactory* factory, const std::vector& channelKeys); - bool HandleMsgInput(const std::string& chName, const InputMsgCallback& callback, int i); + bool HandleInput(const std::string& chName, const InputCallback& callback, int i); + bool HandleMessageInput(const std::string& chName, const InputMessageCallback& callback, int i); bool HandleMultipartInput(const std::string& chName, const InputMultipartCallback& callback, int i); std::vector fUninitializedBindingChannels; std::vector fUninitializedConnectingChannels; bool fDataCallbacks; - std::unordered_map fMsgInputs; + std::unordered_map fMessageInputs; + std::unordered_map fMsgInputs; std::unordered_map fMultipartInputs; std::unordered_map> fMultitransportInputs; std::unordered_map> fChannelRegistry; diff --git a/fairmq/FairMQParts.h b/fairmq/FairMQParts.h index 096dc200e..8b60d3fbc 100644 --- a/fairmq/FairMQParts.h +++ b/fairmq/FairMQParts.h @@ -9,7 +9,6 @@ #ifndef FAIRMQPARTS_H_ #define FAIRMQPARTS_H_ -#include "FairMQTransportFactory.h" #include "FairMQMessage.h" #include diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index f95dbe6a9..7c559b16a 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -10,8 +10,10 @@ #define FAIRMQSOCKET_H_ #include "FairMQMessage.h" +#include #include +#include #include #include #include @@ -32,6 +34,13 @@ enum class TransferCode : int interrupted = -3 }; +struct TransferResult +{ + size_t nbytes; + TransferCode code; + std::optional msg; +}; + } // namespace mq } // namespace fair @@ -48,8 +57,11 @@ class FairMQSocket virtual int64_t Send(FairMQMessagePtr& msg, int timeout = -1) = 0; virtual int64_t Receive(FairMQMessagePtr& msg, int timeout = -1) = 0; - virtual int64_t Send(std::vector>& msgVec, int timeout = -1) = 0; - virtual int64_t Receive(std::vector>& msgVec, int timeout = -1) = 0; + virtual int64_t Send(std::vector& msgVec, int timeout = -1) = 0; + virtual int64_t Receive(std::vector& msgVec, int timeout = -1) = 0; + + virtual fair::mq::TransferResult Send(fair::mq::Msg msg, int timeout = -1) = 0; + virtual fair::mq::TransferResult Receive(int timeout = -1) = 0; virtual void Close() = 0; diff --git a/fairmq/Msg.h b/fairmq/Msg.h new file mode 100644 index 000000000..f8cf32d95 --- /dev/null +++ b/fairmq/Msg.h @@ -0,0 +1,90 @@ +/******************************************************************************** + * Copyright (C) 2014-2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_MSG_H_ +#define FAIR_MQ_MSG_H_ + +#include "FairMQMessage.h" + +#include +#include + +namespace fair +{ +namespace mq +{ + +class Msg +{ + private: + using container = std::vector; + + public: + Msg() : fBuffers() {}; + Msg(size_t elements) : fBuffers() { fBuffers.reserve(elements); }; + + Msg(const Msg&) = delete; + Msg(Msg&) = delete; + Msg& operator=(const Msg&) = delete; + Msg& operator=(Msg&) = delete; + + Msg(Msg&& p) = default; + Msg& operator=(Msg&&) = default; + + template, Msg> && ...) || // with c++20 use std::is_same_v, Msg> + (std::is_convertible_v && ...)>> + Msg(Ts&&... buffers) : fBuffers() + { + fBuffers.reserve(sizeof...(Ts)); + Add(std::forward(buffers)...); + } + + ~Msg() {}; + + FairMQMessage& Add(FairMQMessagePtr buffer) + { + return *(fBuffers.emplace_back(std::move(buffer))); + } + + template + void Add(FairMQMessagePtr first, Ts... remaining) + { + fBuffers.emplace_back(std::move(first)); + if constexpr (sizeof...(Ts) > 0) Add(std::forward(remaining)...); + } + + void Add(Msg other) + { + for (auto& buffer : other) { + fBuffers.emplace_back(std::move(buffer)); + } + } + + FairMQMessage& operator[](int i) { return *(fBuffers[i]); } + FairMQMessage& At(int i) { return *(fBuffers.at(i)); } + + size_t Size() const { return fBuffers.size(); } + void Reserve(size_t cap) { fBuffers.reserve(cap); } + + container fBuffers; + + // forward container iterators + using iterator = container::iterator; + using const_iterator = container::const_iterator; + + auto begin() -> decltype(fBuffers.begin()) { return fBuffers.begin(); } + auto cbegin() -> decltype(fBuffers.cbegin()) { return fBuffers.cbegin(); } + auto end() -> decltype(fBuffers.end()) { return fBuffers.end(); } + auto cend() -> decltype(fBuffers.cend()) { return fBuffers.cend(); } +}; + +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_MSG_H_ */ diff --git a/fairmq/devices/ExperimentalBenchmarkSampler.h b/fairmq/devices/ExperimentalBenchmarkSampler.h new file mode 100644 index 000000000..7cfa2632b --- /dev/null +++ b/fairmq/devices/ExperimentalBenchmarkSampler.h @@ -0,0 +1,110 @@ +/******************************************************************************** + * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_EXPERIMENTALBENCHMARKSAMPLER_H_ +#define FAIR_MQ_EXPERIMENTALBENCHMARKSAMPLER_H_ + +#include "FairMQDevice.h" +#include "tools/RateLimit.h" + +#include + +#include +#include // size_t +#include // uint64_t +#include // memset +#include + +/** + * Sampler to generate traffic for benchmarking. + */ + +namespace fair +{ +namespace mq +{ + +class ExperimentalBenchmarkSampler : public FairMQDevice +{ + public: + ExperimentalBenchmarkSampler() + : fMemSet(false) + , fNumBuffers(1) + , fBufSize(10000) + , fBufAlignment(0) + , fMsgRate(0) + , fNumIterations(0) + , fMaxIterations(0) + , fOutChannelName() + {} + + void InitTask() override + { + fMemSet = fConfig->GetProperty("memset"); + fNumBuffers = fConfig->GetProperty("num-buffers"); + fBufSize = fConfig->GetProperty("buf-size"); + fBufAlignment = fConfig->GetProperty("buf-alignment"); + fMsgRate = fConfig->GetProperty("msg-rate"); + fMaxIterations = fConfig->GetProperty("max-iterations"); + fOutChannelName = fConfig->GetProperty("out-channel"); + } + + void Run() override + { + // store the channel reference to avoid traversing the map on every loop iteration + FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); + + LOG(info) << "Starting the benchmark with messages consisting of " << fNumBuffers << " buffer(s) of size " << fBufSize << " and " << fMaxIterations << " iterations."; + auto tStart = std::chrono::high_resolution_clock::now(); + + tools::RateLimiter rateLimiter(fMsgRate); + + while (!NewStatePending()) { + Msg msg(fNumBuffers); + + for (size_t i = 0; i < fNumBuffers; ++i) { + const auto& buffer = msg.Add(dataOutChannel.NewMessage(fBufSize, Alignment{fBufAlignment})); + if (fMemSet) { + std::memset(buffer.GetData(), 0, buffer.GetSize()); + } + } + + if (dataOutChannel.Send(std::move(msg)).code == TransferCode::success) { + if (fMaxIterations > 0) { + if (fNumIterations >= fMaxIterations) { + break; + } + } + ++fNumIterations; + } + + if (fMsgRate > 0) { + rateLimiter.maybe_sleep(); + } + } + + auto tEnd = std::chrono::high_resolution_clock::now(); + + LOG(info) << "Done " << fNumIterations << " iterations in " << std::chrono::duration(tEnd - tStart).count() << "ms."; + } + + protected: + bool fMemSet; + size_t fNumBuffers; + size_t fBufSize; + size_t fBufAlignment; + float fMsgRate; + uint64_t fNumIterations; + uint64_t fMaxIterations; + std::string fOutChannelName; +}; + +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_EXPERIMENTALBENCHMARKSAMPLER_H_ */ diff --git a/fairmq/devices/ExperimentalSink.h b/fairmq/devices/ExperimentalSink.h new file mode 100644 index 000000000..977dfa195 --- /dev/null +++ b/fairmq/devices/ExperimentalSink.h @@ -0,0 +1,74 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_EXSINK_H_ +#define FAIR_MQ_EXSINK_H_ + +#include "../FairMQDevice.h" +#include "../FairMQLogger.h" + +#include +#include // uint64_t +#include + +namespace fair +{ +namespace mq +{ + +class ExperimentalSink : public FairMQDevice +{ + public: + ExperimentalSink() + : fMaxIterations(0) + , fNumIterations(0) + , fInChannelName() + {} + + void InitTask() override + { + fMaxIterations = fConfig->GetProperty("max-iterations"); + fInChannelName = fConfig->GetProperty("in-channel"); + } + + void Run() override + { + // store the channel reference to avoid traversing the map on every loop iteration + FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0); + + LOG(info) << "Starting the benchmark and expecting to receive " << fMaxIterations << " messages."; + auto tStart = std::chrono::high_resolution_clock::now(); + + while (!NewStatePending()) { + if (dataInChannel.Receive().code == TransferCode::success) { + if (fMaxIterations > 0) { + if (fNumIterations >= fMaxIterations) { + LOG(info) << "Configured maximum number of iterations reached."; + break; + } + } + fNumIterations++; + } + } + + auto tEnd = std::chrono::high_resolution_clock::now(); + + LOG(info) << "Leaving RUNNING state. Received " << fNumIterations << " messages in " + << std::chrono::duration(tEnd - tStart).count() << "ms."; + } + + protected: + uint64_t fMaxIterations; + uint64_t fNumIterations; + std::string fInChannelName; +}; + +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_EXSINK_H_ */ diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index 25f9afdc2..c2408e2b9 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -13,7 +13,6 @@ #include "FairMQDevice.h" #include "tools/RateLimit.h" -#include #include #include // size_t #include // uint64_t @@ -106,15 +105,12 @@ class FairMQBenchmarkSampler : public FairMQDevice LOG(info) << "Done " << fNumIterations << " iterations in " << std::chrono::duration(tEnd - tStart).count() << "ms."; } - virtual ~FairMQBenchmarkSampler() {} - protected: bool fMultipart; bool fMemSet; size_t fNumParts; size_t fMsgSize; size_t fMsgAlignment; - std::atomic fMsgCounter; float fMsgRate; uint64_t fNumIterations; uint64_t fMaxIterations; diff --git a/fairmq/run/runExperimentalBenchmarkSampler.cxx b/fairmq/run/runExperimentalBenchmarkSampler.cxx new file mode 100644 index 000000000..2bd9a65e2 --- /dev/null +++ b/fairmq/run/runExperimentalBenchmarkSampler.cxx @@ -0,0 +1,29 @@ +/******************************************************************************** + * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("out-channel", bpo::value()->default_value("data"), "Name of the output channel") + ("memset", bpo::value()->default_value(false), "Memset allocated buffers to 0") + ("num-buffers", bpo::value()->default_value(1), "Number of buffers to send.") + ("buf-size", bpo::value()->default_value(1000000), "Buffer size in bytes") + ("buf-alignment", bpo::value()->default_value(0), "Buffer alignment") + ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)") + ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); +} + +FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /* config */) +{ + return new fair::mq::ExperimentalBenchmarkSampler(); +} diff --git a/fairmq/run/runExperimentalSink.cxx b/fairmq/run/runExperimentalSink.cxx new file mode 100644 index 000000000..8912d2054 --- /dev/null +++ b/fairmq/run/runExperimentalSink.cxx @@ -0,0 +1,24 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + ("in-channel", bpo::value()->default_value("data"), "Name of the input channel") + ("max-iterations", bpo::value()->default_value(0), "Number of run iterations (0 - infinite)"); +} + +FairMQDevicePtr getDevice(const fair::mq::ProgOptions& /*config*/) +{ + return new fair::mq::ExperimentalSink(); +} diff --git a/fairmq/run/startExperimentalMQBenchmark.sh.in b/fairmq/run/startExperimentalMQBenchmark.sh.in new file mode 100755 index 000000000..c731400f8 --- /dev/null +++ b/fairmq/run/startExperimentalMQBenchmark.sh.in @@ -0,0 +1,86 @@ +#!/bin/bash + +export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@ + +maxIterations="0" +bufSize="1000000" +transport="zeromq" +affinity="false" +numBufs="1" +affinitySamp="" +affinitySink="" + +if [[ $1 =~ ^[0-9]+$ ]]; then + bufSize=$1 +fi + +if [[ $2 =~ ^[0-9]+$ ]]; then + maxIterations=$2 +fi + +if [[ $3 =~ ^[a-z]+$ ]]; then + transport=$3 +fi + +if [[ $4 =~ ^[a-z]+$ ]]; then + affinity=$4 +fi + +if [[ $6 =~ ^[0-9]+$ ]]; then + numBufs=$6 +fi + +echo "Starting experimental benchmark with following settings:" + +echo "" +echo "buffer size: $bufSize bytes, number of buffers: $numBufs" + +if [ $maxIterations = 0 ]; then + echo "number of iterations: unlimited" +else + echo "number of iterations: $maxIterations" +fi + +echo "transport: $transport" + +if [ $affinity = "true" ]; then + affinitySamp="taskset -c 0" + affinitySink="taskset -c 1" + echo "affinity: assigning sampler to core 0, sink to core 1" +else + echo "" +fi + +echo "" +echo "Usage: startBenchmark [message size=1000000] [number of iterations=0] [transport=zeromq/shmem] [affinity=false]" + +SAMPLER="fairmq-exp-bsampler" +SAMPLER+=" --id bsampler1" +#SAMPLER+=" --io-threads 2" +#SAMPLER+=" --control static" +SAMPLER+=" --transport $transport" +SAMPLER+=" --severity debug" +SAMPLER+=" --buf-size $bufSize" +SAMPLER+=" --num-buffers $numBufs" +SAMPLER+=" --shm-throw-bad-alloc false" +# SAMPLER+=" --msg-rate 1000" +SAMPLER+=" --max-iterations $maxIterations" +SAMPLER+=" --channel-config name=data,type=pair,method=bind,address=tcp://127.0.0.1:5555" +xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER & +echo "" +echo "started: xterm -geometry 90x50+0+0 -hold -e $affinitySamp @CMAKE_CURRENT_BINARY_DIR@/$SAMPLER" +echo "pid: $!" + +SINK="fairmq-exp-sink" +SINK+=" --id sink1" +#SINK+=" --io-threads 2" +#SINK+=" --control static" +SINK+=" --transport $transport" +SINK+=" --severity debug" +SINK+=" --max-iterations $maxIterations" +SINK+=" --channel-config name=data,type=pair,method=connect,address=tcp://127.0.0.1:5555" +xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK & +echo "" +echo "started: xterm -geometry 90x50+550+0 -hold -e $affinitySink @CMAKE_CURRENT_BINARY_DIR@/$SINK" +echo "pid: $!" +echo "" diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index 7647ae75f..e36e6419d 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -155,6 +155,28 @@ class Socket final : public fair::mq::Socket } } + TransferResult HandleErrors2(Msg msg) const + { + if (zmq_errno() == ETERM) { + LOG(debug) << "Terminating socket " << fId; + return {0, TransferCode::error, std::move(msg)}; + } else { + LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); + return {0, TransferCode::error, std::move(msg)}; + } + } + + TransferResult HandleErrors3() const + { + if (zmq_errno() == ETERM) { + LOG(debug) << "Terminating socket " << fId; + return {0, TransferCode::error, {}}; + } else { + LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); + return {0, TransferCode::error, {}}; + } + } + int64_t Send(MessagePtr& msg, const int timeout = -1) override { int flags = 0; @@ -340,7 +362,122 @@ class Socket final : public fair::mq::Socket } } - return static_cast(TransferResult::error); + return static_cast(TransferCode::error); + } + + TransferResult Send(Msg msg, int timeout = -1) override + { + const size_t vecSize = msg.Size(); + + if (vecSize == 0) { + LOG(warn) << "Will not send empty vector"; + return {0, TransferCode::error, std::move(msg)}; + } + + int flags = 0; + if (timeout == 0) { + flags = ZMQ_DONTWAIT; + } + int elapsed = 0; + + // put it into zmq message + ZMsg zmqMsg(vecSize * sizeof(MetaHeader)); + + // prepare the message with shm metas + MetaHeader* metas = static_cast(zmqMsg.Data()); + + for (auto& buf : msg) { + Message& shmMsg = static_cast(*buf); + std::memcpy(metas++, &(shmMsg.fMeta), sizeof(MetaHeader)); + } + + while (true) { + size_t totalSize = 0; + int nbytes = zmq_msg_send(zmqMsg.Msg(), fSocket, flags); + if (nbytes > 0) { + assert(static_cast(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing + + for (auto& buf : msg) { + Message& shmMsg = static_cast(*buf); + shmMsg.fQueued = true; + totalSize += shmMsg.fMeta.fSize; + } + + // store statistics on how many messages have been sent + fMessagesTx++; + fBytesTx += totalSize; + + return {totalSize, TransferCode::success, {}}; + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { + if (fManager.Interrupted()) { + return {0, TransferCode::interrupted, std::move(msg)}; + } else if (ShouldRetry(flags, timeout, elapsed)) { + continue; + } else { + return {0, TransferCode::timeout, std::move(msg)}; + } + } else { + return HandleErrors2(std::move(msg)); + } + } + + return {0, TransferCode::error, std::move(msg)}; + } + + TransferResult Receive(int timeout = -1) override + { + int flags = 0; + if (timeout == 0) { + flags = ZMQ_DONTWAIT; + } + int elapsed = 0; + + ZMsg zmqMsg; + + while (true) { + size_t totalSize = 0; + int nbytes = zmq_msg_recv(zmqMsg.Msg(), fSocket, flags); + if (nbytes > 0) { + MetaHeader* hdrVec = static_cast(zmqMsg.Data()); + const auto hdrVecSize = zmqMsg.Size(); + + assert(hdrVecSize > 0); + if (hdrVecSize % sizeof(MetaHeader) != 0) { + throw SocketError( + tools::ToString("Received message is not a valid FairMQ shared memory message. ", + "Possibly due to a misconfigured transport on the sender side. ", + "Expected size of ", sizeof(MetaHeader), " bytes, received ", nbytes)); + } + + const auto numMessages = hdrVecSize / sizeof(MetaHeader); + Msg msg(numMessages); + + for (size_t m = 0; m < numMessages; m++) { + // create new message buffer + msg.fBuffers.emplace_back(tools::make_unique(fManager, hdrVec[m], GetTransport())); + Message& shmMsg = static_cast(*(msg.fBuffers.back())); + totalSize += shmMsg.GetSize(); + } + + // store statistics on how many messages have been received (handle all parts as a single message) + fMessagesRx++; + fBytesRx += totalSize; + + return {totalSize, TransferCode::success, std::move(msg)}; + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { + if (fManager.Interrupted()) { + return {0, TransferCode::interrupted, {}}; + } else if (ShouldRetry(flags, timeout, elapsed)) { + continue; + } else { + return {0, TransferCode::timeout, {}}; + } + } else { + return HandleErrors3(); + } + } + + return {0, TransferCode::error, {}}; } void* GetSocket() const { return fSocket; } diff --git a/fairmq/zeromq/Socket.h b/fairmq/zeromq/Socket.h index 1447a70f8..b949e6fb2 100644 --- a/fairmq/zeromq/Socket.h +++ b/fairmq/zeromq/Socket.h @@ -132,6 +132,28 @@ class Socket final : public fair::mq::Socket } } + TransferResult HandleErrors2(Msg msg) const + { + if (zmq_errno() == ETERM) { + LOG(debug) << "Terminating socket " << fId; + return {0, TransferCode::error, std::move(msg)}; + } else { + LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); + return {0, TransferCode::error, std::move(msg)}; + } + } + + TransferResult HandleErrors3() const + { + if (zmq_errno() == ETERM) { + LOG(debug) << "Terminating socket " << fId; + return {0, TransferCode::error, {}}; + } else { + LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno); + return {0, TransferCode::error, {}}; + } + } + int64_t Send(MessagePtr& msg, const int timeout = -1) override { int flags = 0; @@ -295,6 +317,106 @@ class Socket final : public fair::mq::Socket } } + TransferResult Send(Msg msg, int timeout = -1) override + { + const size_t vecSize = msg.Size(); + + if (vecSize == 0) { + LOG(warn) << "Will not send empty vector"; + return {0, TransferCode::error, std::move(msg)}; + } + + int flags = 0; + if (timeout == 0) { + flags = ZMQ_DONTWAIT; + } + int elapsed = 0; + + while (true) { + size_t totalSize = 0; + bool repeat = false; + + for (unsigned int i = 0; i < vecSize; ++i) { + static_cast(msg[i]).ApplyUsedSize(); + + int nbytes = zmq_msg_send(static_cast(msg[i]).GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags); + if (nbytes >= 0) { + totalSize += nbytes; + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { + if (fCtx.Interrupted()) { + return {0, TransferCode::interrupted, std::move(msg)}; + } else if (ShouldRetry(flags, timeout, elapsed)) { + repeat = true; + break; + } else { + return {0, TransferCode::timeout, std::move(msg)}; + } + } else { + return HandleErrors2(std::move(msg)); + } + } + + if (repeat) { + continue; + } + + // store statistics on how many messages have been sent (handle all parts as a single message) + ++fMessagesTx; + fBytesTx += totalSize; + return {totalSize, TransferCode::success, {}}; + } + } + + TransferResult Receive(int timeout = -1) override + { + Msg msg; + + int flags = 0; + if (timeout == 0) { + flags = ZMQ_DONTWAIT; + } + int elapsed = 0; + + while (true) { + size_t totalSize = 0; + int more = 0; + bool repeat = false; + + do { + FairMQMessagePtr buf = tools::make_unique(GetTransport()); + + int nbytes = zmq_msg_recv(static_cast(buf.get())->GetMessage(), fSocket, flags); + if (nbytes >= 0) { + msg.Add(move(buf)); + totalSize += nbytes; + } else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) { + if (fCtx.Interrupted()) { + return {0, TransferCode::interrupted, {}}; + } else if (ShouldRetry(flags, timeout, elapsed)) { + repeat = true; + break; + } else { + return {0, TransferCode::timeout, {}}; + } + } else { + return HandleErrors3(); + } + + size_t moreSize = sizeof(more); + zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize); + } while (more); + + if (repeat) { + continue; + } + + // store statistics on how many messages have been received (all buffers are a single message) + ++fMessagesRx; + fBytesRx += totalSize; + return {totalSize, TransferCode::success, std::move(msg)}; + } + } + void* GetSocket() const { return fSocket; } void Close() override