Skip to content

Commit

Permalink
Add experimental fair::mq::Msg & Socket API
Browse files Browse the repository at this point in the history
  • Loading branch information
rbx committed Dec 4, 2020
1 parent cfa25d1 commit 19023b9
Show file tree
Hide file tree
Showing 19 changed files with 1,080 additions and 168 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,9 @@ message(STATUS " ")
message(STATUS " ${Cyan}RUN STATIC ANALYSIS ${static_ana_summary}")
message(STATUS " ")
if(FAIRMQ_DEBUG_MODE)
message(STATUS " ${Cyan}DEBUG_MODE${CR} ${BGreen}${FAIRMQ_DEBUG_MODE}${CR} (disable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=OFF${CR})")
message(STATUS " ${Cyan}DEBUG MODE${CR} ${BGreen}${FAIRMQ_DEBUG_MODE}${CR} (disable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=OFF${CR})")
else()
message(STATUS " ${Cyan}DEBUG_MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})")
message(STATUS " ${Cyan}DEBUG MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})")
endif()
message(STATUS " ")
################################################################################
58 changes: 58 additions & 0 deletions fairmq/Buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/********************************************************************************
* Copyright (C) 2014-2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/

#ifndef FAIR_MQ_BUFFER_H_
#define FAIR_MQ_BUFFER_H_

#include "FairMQMessage.h"

namespace fair
{
namespace mq
{

class Buffer
{
public:
Buffer() = delete;
Buffer(FairMQMessagePtr&& msgPtr)
: fBuffer(std::move(msgPtr))
{}

Buffer(const Buffer&) = delete;
Buffer(Buffer&) = delete;
Buffer& operator=(const Buffer&) = delete;
Buffer& operator=(Buffer&) = delete;

Buffer(Buffer&&) = default;
Buffer& operator=(Buffer&&) = default;

void Rebuild() { fBuffer->Rebuild(); }
void Rebuild(size_t size) { fBuffer->Rebuild(size); }
void Rebuild(void* data, size_t size, fairmq_free_fn* ffn, void* hint = nullptr) { fBuffer->Rebuild(data, size, ffn, hint); }

void* GetData() const { return fBuffer->GetData(); }
size_t GetSize() const { return fBuffer->GetSize(); }

bool SetUsedSize(size_t size) { return fBuffer->SetUsedSize(size); }

fair::mq::Transport GetType() const { return fBuffer->GetType(); }
FairMQTransportFactory* GetTransport() { return fBuffer->GetTransport(); }
void SetTransport(FairMQTransportFactory* transport) { fBuffer->SetTransport(transport); }

void Copy(const Buffer& buf) { fBuffer->Copy(*(buf.fBuffer)); }

~Buffer() {};

FairMQMessagePtr fBuffer;
};

} // namespace mq
} // namespace fair

#endif /* FAIR_MQ_BUFFER_H_ */
9 changes: 9 additions & 0 deletions fairmq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ if(BUILD_FAIRMQ)
# libFairMQ header files #
##########################
set(FAIRMQ_PUBLIC_HEADER_FILES
Buffer.h
DeviceRunner.h
EventManager.h
FairMQChannel.h
Expand All @@ -158,6 +159,7 @@ if(BUILD_FAIRMQ)
FairMQTransportFactory.h
MemoryResources.h
MemoryResourceTools.h
Msg.h
Transports.h
options/FairMQProgOptions.h
JSONParser.h
Expand Down Expand Up @@ -244,6 +246,7 @@ if(BUILD_FAIRMQ)
###################
set(FAIRMQ_BIN_DIR ${CMAKE_BINARY_DIR}/fairmq)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startMQBenchmark.sh)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run/startExperimentalMQBenchmark.sh.in ${CMAKE_CURRENT_BINARY_DIR}/startExperimentalMQBenchmark.sh)

#################################
# define libFairMQ build target #
Expand Down Expand Up @@ -350,6 +353,9 @@ if(BUILD_FAIRMQ)
add_executable(fairmq-bsampler run/runBenchmarkSampler.cxx)
target_link_libraries(fairmq-bsampler FairMQ)

add_executable(fairmq-exp-bsampler run/runExperimentalBenchmarkSampler.cxx)
target_link_libraries(fairmq-exp-bsampler FairMQ)

add_executable(fairmq-merger run/runMerger.cxx)
target_link_libraries(fairmq-merger FairMQ)

Expand All @@ -362,6 +368,9 @@ if(BUILD_FAIRMQ)
add_executable(fairmq-sink run/runSink.cxx)
target_link_libraries(fairmq-sink FairMQ)

add_executable(fairmq-exp-sink run/runExperimentalSink.cxx)
target_link_libraries(fairmq-exp-sink FairMQ)

add_executable(fairmq-splitter run/runSplitter.cxx)
target_link_libraries(fairmq-splitter FairMQ)

Expand Down
2 changes: 1 addition & 1 deletion fairmq/FairMQChannel.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin
, fPortRangeMax(DefaultPortRangeMax)
, fAutoBind(DefaultAutoBind)
, fValid(false)
, fMultipart(false)
, fMultipart(0)
{
// LOG(warn) << "Constructing channel '" << fName << "'";
}
Expand Down
77 changes: 50 additions & 27 deletions fairmq/FairMQChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <FairMQTransportFactory.h>
#include <FairMQUnmanagedRegion.h>
#include <FairMQSocket.h>
#include <fairmq/Msg.h>
#include <fairmq/Transports.h>
#include <FairMQParts.h>
#include <fairmq/Properties.h>
Expand Down Expand Up @@ -256,7 +257,7 @@ class FairMQChannel
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
Expand All @@ -266,7 +267,7 @@ class FairMQChannel
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
Expand All @@ -276,7 +277,7 @@ class FairMQChannel
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
Expand All @@ -286,7 +287,7 @@ class FairMQChannel
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
Expand All @@ -296,7 +297,7 @@ class FairMQChannel
/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
Expand All @@ -305,12 +306,28 @@ class FairMQChannel
/// Receive FairMQParts
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received,TransferCode::timeout if timed out,TransferCode::error if there was an error,TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);
}

fair::mq::TransferResult Send(fair::mq::Buffer buf, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(buf.fBuffer);
return fSocket->Send(std::move(buf), sndTimeoutInMs);
}

fair::mq::TransferResult Send(fair::mq::Msg msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg.fBuffers);
return fSocket->Send(std::move(msg), sndTimeoutInMs);
}
fair::mq::TransferResult Receive(int rcvTimeoutInMs = -1)
{
return fSocket->Receive(rcvTimeoutInMs);
}

unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); }
unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); }
unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); }
Expand All @@ -319,28 +336,20 @@ class FairMQChannel
auto Transport() -> FairMQTransportFactory* { return fTransportFactory.get(); };

template<typename... Args>
FairMQMessagePtr NewMessage(Args&&... args)
{
return Transport()->CreateMessage(std::forward<Args>(args)...);
}

FairMQMessagePtr NewMessage(Args&&... args) { return Transport()->CreateMessage(std::forward<Args>(args)...); }
template<typename... Args>
fair::mq::Buffer NewBuffer(Args&&... args) { return Transport()->NewBuffer(std::forward<Args>(args)...); }
template<typename T>
FairMQMessagePtr NewSimpleMessage(const T& data)
{
return Transport()->NewSimpleMessage(data);
}

FairMQMessagePtr NewSimpleMessage(const T& data) { return Transport()->NewSimpleMessage(data); }
template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data)
{
return Transport()->NewStaticMessage(data);
}
fair::mq::Buffer NewSimpleBuffer(const T& data) { return Transport()->NewSimpleBuffer(data); }
template<typename T>
FairMQMessagePtr NewStaticMessage(const T& data) { return Transport()->NewStaticMessage(data); }
template<typename T>
fair::mq::Buffer NewStaticBuffer(const T& data) { return Transport()->NewStaticBuffer(data); }

template<typename... Args>
FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args)
{
return Transport()->CreateUnmanagedRegion(std::forward<Args>(args)...);
}
FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args) { return Transport()->CreateUnmanagedRegion(std::forward<Args>(args)...); }

static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT;
static constexpr const char* DefaultTransportName = "default";
Expand Down Expand Up @@ -379,7 +388,7 @@ class FairMQChannel

bool fValid;

bool fMultipart;
short fMultipart;

void CheckSendCompatibility(FairMQMessagePtr& msg)
{
Expand All @@ -399,7 +408,6 @@ class FairMQChannel
{
for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) {

FairMQMessagePtr msgWrapper(NewMessage(
msg->GetData(),
msg->GetSize(),
Expand All @@ -412,6 +420,22 @@ class FairMQChannel
}
}

void CheckSendCompatibility(std::vector<fair::mq::Buffer>& bufVec)
{
for (auto& buf : bufVec) {
if (fTransportType != buf.GetType()) {
fair::mq::Buffer bufWrapper(NewBuffer(
buf.GetData(),
buf.GetSize(),
[](void* /*data*/, void* _buf) { delete static_cast<FairMQMessage*>(_buf); },
buf.fBuffer.get()
));
buf.fBuffer.release();
buf = std::move(bufWrapper);
}
}
}

void CheckReceiveCompatibility(FairMQMessagePtr& msg)
{
if (fTransportType != msg->GetType()) {
Expand All @@ -424,7 +448,6 @@ class FairMQChannel
{
for (auto& msg : msgVec) {
if (fTransportType != msg->GetType()) {

FairMQMessagePtr newMsg(NewMessage());
msg = move(newMsg);
}
Expand Down
Loading

0 comments on commit 19023b9

Please sign in to comment.