Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various changes #322

Merged
merged 9 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ get_git_version()
project(FairMQ VERSION ${PROJECT_VERSION} LANGUAGES CXX)
message(STATUS "${BWhite}${PROJECT_NAME}${CR} ${PROJECT_GIT_VERSION} from ${PROJECT_DATE}")

if(BUILD_OFI_TRANSPORT OR BUILD_SDK OR BUILD_PMIX_PLUGIN)
set(PROJECT_MIN_CXX_STANDARD 14)
else()
set(PROJECT_MIN_CXX_STANDARD 11)
endif()
set(PROJECT_MIN_CXX_STANDARD 17)

set_fairmq_defaults()

Expand Down Expand Up @@ -262,7 +258,7 @@ install_cmake_package()

# Summary ######################################################################
message(STATUS " ")
message(STATUS " ${Cyan}CXX STANDARD${CR} ${BGreen}C++${CMAKE_CXX_STANDARD}${CR} (>= C++${PROJECT_MIN_CXX_STANDARD}, change with ${BMagenta}-DCMAKE_CXX_STANDARD=17${CR})")
message(STATUS " ${Cyan}CXX STANDARD${CR} ${BGreen}C++${CMAKE_CXX_STANDARD}${CR} (>= C++${PROJECT_MIN_CXX_STANDARD}, change with ${BMagenta}-DCMAKE_CXX_STANDARD=20${CR})")
if(CMAKE_CXX_FLAGS)
message(STATUS " ")
message(STATUS " ${Cyan}GLOBAL CXX FLAGS${CR} ${BGreen}${CMAKE_CXX_FLAGS}${CR}")
Expand Down Expand Up @@ -365,9 +361,9 @@ else()
endif()
message(STATUS " ${BWhite}tests${CR} ${tests_summary}")
if(BUILD_OFI_TRANSPORT)
set(ofi_summary "${BGreen}YES${CR} EXPERIMENTAL (requires C++14) (disable with ${BMagenta}-DBUILD_OFI_TRANSPORT=OFF${CR})")
set(ofi_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_OFI_TRANSPORT=OFF${CR})")
else()
set(ofi_summary "${BRed} NO${CR} EXPERIMENTAL (requires C++14) (default, enable with ${BMagenta}-DBUILD_OFI_TRANSPORT=ON${CR})")
set(ofi_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_OFI_TRANSPORT=ON${CR})")
endif()
message(STATUS " ${BWhite}ofi_transport${CR} ${ofi_summary}")
if(BUILD_DDS_PLUGIN)
Expand All @@ -377,9 +373,9 @@ else()
endif()
message(STATUS " ${BWhite}dds_plugin${CR} ${dds_summary}")
if(BUILD_PMIX_PLUGIN)
set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (requires C++14) (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})")
set(pmix_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_PMIX_PLUGIN=OFF${CR})")
else()
set(pmix_summary "${BRed} NO${CR} EXPERIMENTAL (requires C++14) (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})")
set(pmix_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_PMIX_PLUGIN=ON${CR})")
endif()
message(STATUS " ${BWhite}pmix_plugin${CR} ${pmix_summary}")
if(BUILD_EXAMPLES)
Expand All @@ -395,9 +391,9 @@ else()
endif()
message(STATUS " ${BWhite}docs${CR} ${docs_summary}")
if(BUILD_SDK)
set(sdk_summary "${BGreen}YES${CR} EXPERIMENTAL (required C++14) (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
set(sdk_summary "${BGreen}YES${CR} EXPERIMENTAL (disable with ${BMagenta}-DBUILD_SDK=OFF${CR})")
else()
set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (required C++14) (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
set(sdk_summary "${BRed} NO${CR} EXPERIMENTAL (default, enable with ${BMagenta}-DBUILD_SDK=ON${CR})")
endif()
message(STATUS " ${BWhite}sdk${CR} ${sdk_summary}")
if(BUILD_SDK_COMMANDS)
Expand Down Expand Up @@ -432,9 +428,9 @@ message(STATUS " ")
message(STATUS " ${Cyan}RUN STATIC ANALYSIS ${static_ana_summary}")
message(STATUS " ")
if(FAIRMQ_DEBUG_MODE)
message(STATUS " ${Cyan}DEBUG_MODE${CR} ${BGreen}${FAIRMQ_DEBUG_MODE}${CR} (disable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=OFF${CR})")
message(STATUS " ${Cyan}DEBUG MODE${CR} ${BGreen}${FAIRMQ_DEBUG_MODE}${CR} (disable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=OFF${CR})")
else()
message(STATUS " ${Cyan}DEBUG_MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})")
message(STATUS " ${Cyan}DEBUG MODE${CR} ${BRed}${FAIRMQ_DEBUG_MODE}${CR} (enable with ${BMagenta}-DFAIRMQ_DEBUG_MODE=ON${CR})")
endif()
message(STATUS " ")
################################################################################
4 changes: 4 additions & 0 deletions fairmq/DeviceRunner.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ bool DeviceRunner::HandleGeneralOptions(const fair::mq::ProgOptions& config, boo
fair::Logger::SetConsoleSeverity("nolog");
} else {
fair::Logger::SetConsoleColor(color);
auto envFairMQSeverity = getenv("FAIRMQ_SEVERITY");
if (envFairMQSeverity) {
severity = envFairMQSeverity;
}
if (severity != "") {
fair::Logger::SetConsoleSeverity(severity);
}
Expand Down
12 changes: 6 additions & 6 deletions fairmq/FairMQChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class FairMQChannel
/// Sends a message to the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msg);
Expand All @@ -266,7 +266,7 @@ class FairMQChannel
/// Receives a message from the socket queue.
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msg);
Expand All @@ -276,7 +276,7 @@ class FairMQChannel
/// Send a vector of messages
/// @param msgVec message vector reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(std::vector<FairMQMessagePtr>& msgVec, int sndTimeoutInMs = -1)
{
CheckSendCompatibility(msgVec);
Expand All @@ -286,7 +286,7 @@ class FairMQChannel
/// Receive a vector of messages
/// @param msgVec message vector reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(std::vector<FairMQMessagePtr>& msgVec, int rcvTimeoutInMs = -1)
{
CheckReceiveCompatibility(msgVec);
Expand All @@ -296,7 +296,7 @@ class FairMQChannel
/// Send FairMQParts
/// @param parts FairMQParts reference
/// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1)
{
return Send(parts.fParts, sndTimeoutInMs);
Expand All @@ -305,7 +305,7 @@ class FairMQChannel
/// Receive FairMQParts
/// @param parts FairMQParts reference
/// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1)
{
return Receive(parts.fParts, rcvTimeoutInMs);
Expand Down
38 changes: 0 additions & 38 deletions fairmq/FairMQDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,6 @@
using namespace std;
using namespace fair::mq;

static map<Transition, State> backwardsCompatibilityWaitForEndOfStateHelper =
{
{ Transition::InitDevice, State::InitializingDevice },
{ Transition::CompleteInit, State::Initialized },
{ Transition::Bind, State::Bound },
{ Transition::Connect, State::DeviceReady },
{ Transition::InitTask, State::Ready },
{ Transition::Run, State::Ready },
{ Transition::Stop, State::Ready },
{ Transition::ResetTask, State::DeviceReady },
{ Transition::ResetDevice, State::Idle }
};

static map<int, Transition> backwardsCompatibilityChangeStateHelper =
{
{ FairMQDevice::Event::INIT_DEVICE, Transition::InitDevice },
{ FairMQDevice::Event::internal_DEVICE_READY, Transition::Auto },
{ FairMQDevice::Event::INIT_TASK, Transition::InitTask },
{ FairMQDevice::Event::internal_READY, Transition::Auto },
{ FairMQDevice::Event::RUN, Transition::Run },
{ FairMQDevice::Event::STOP, Transition::Stop },
{ FairMQDevice::Event::RESET_TASK, Transition::ResetTask },
{ FairMQDevice::Event::RESET_DEVICE, Transition::ResetDevice },
{ FairMQDevice::Event::internal_IDLE, Transition::Auto },
{ FairMQDevice::Event::END, Transition::End },
{ FairMQDevice::Event::ERROR_FOUND, Transition::ErrorFound }
};

constexpr const char* FairMQDevice::DefaultId;
constexpr int FairMQDevice::DefaultIOThreads;
constexpr const char* FairMQDevice::DefaultTransportName;
Expand Down Expand Up @@ -244,16 +216,6 @@ void FairMQDevice::TransitionTo(const fair::mq::State s)
}
}

bool FairMQDevice::ChangeState(const int transition)
{
return ChangeState(backwardsCompatibilityChangeStateHelper.at(transition));
}

void FairMQDevice::WaitForEndOfState(Transition transition)
{
WaitForState(backwardsCompatibilityWaitForEndOfStateHelper.at(transition));
}

void FairMQDevice::InitWrapper()
{
// run initialization once CompleteInit transition is requested
Expand Down
50 changes: 4 additions & 46 deletions fairmq/FairMQDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,6 @@ class FairMQDevice
friend class FairMQChannel;

public:
// backwards-compatibility enum for old state machine interface, todo: delete this
enum Event
{
INIT_DEVICE,
internal_DEVICE_READY,
INIT_TASK,
internal_READY,
RUN,
STOP,
RESET_TASK,
RESET_DEVICE,
internal_IDLE,
END,
ERROR_FOUND
};

// backwards-compatibility enum for old state machine interface, todo: delete this
enum State
{
OK,
Error,
IDLE,
INITIALIZING_DEVICE,
DEVICE_READY,
INITIALIZING_TASK,
READY,
RUNNING,
RESETTING_TASK,
RESETTING_DEVICE,
EXITING
};

/// Default constructor
FairMQDevice();
/// Constructor with external fair::mq::ProgOptions
Expand Down Expand Up @@ -128,7 +96,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(msg, sndTimeoutInMs);
Expand All @@ -139,7 +107,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs);
Expand All @@ -150,7 +118,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send)
/// @return Number of bytes that have been queued, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1)
{
return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs);
Expand All @@ -161,7 +129,7 @@ class FairMQDevice
/// @param chan channel name
/// @param i channel index
/// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive)
/// @return Number of bytes that have been received, TransferResult::timeout if timed out, TransferResult::error if there was an error, TransferResult::interrupted if interrupted (e.g. by requested state change)
/// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change)
int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1)
{
return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs);
Expand Down Expand Up @@ -449,8 +417,6 @@ class FairMQDevice
/// Called in the RUNNING state once after executing the Run()/ConditionalRun() method
virtual void PostRun() {}

virtual void Pause() __attribute__((deprecated("PAUSE state is removed. This method is never called. To pause Run, go to READY with STOP transition and back to RUNNING with RUN to resume."))) {}

/// Resets the user task (to be overloaded in child classes)
virtual void ResetTask() {}

Expand All @@ -461,11 +427,6 @@ class FairMQDevice
bool ChangeState(const fair::mq::Transition transition) { return fStateMachine.ChangeState(transition); }
bool ChangeState(const std::string& transition) { return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); }

bool ChangeState(const int transition) __attribute__((deprecated("Use ChangeState(const fair::mq::Transition transition).")));

void WaitForEndOfState(const fair::mq::Transition transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState).")));
void WaitForEndOfState(const std::string& transition) __attribute__((deprecated("Use WaitForState(fair::mq::State expectedState)."))) { WaitForState(transition); }

fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); }
void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); }
void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); }
Expand All @@ -478,9 +439,6 @@ class FairMQDevice
void SubscribeToNewTransition(const std::string& key, std::function<void(const fair::mq::Transition)> callback) { fStateMachine.SubscribeToNewTransition(key, callback); }
void UnsubscribeFromNewTransition(const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); }

bool CheckCurrentState(const int /* state */) const __attribute__((deprecated("Use NewStatePending()."))) { return !fStateMachine.NewStatePending(); }
bool CheckCurrentState(const std::string& /* state */) const __attribute__((deprecated("Use NewStatePending()."))) { return !fStateMachine.NewStatePending(); }

/// Returns true is a new state has been requested, signaling the current handler to stop.
bool NewStatePending() const { return fStateMachine.NewStatePending(); }

Expand Down
2 changes: 2 additions & 0 deletions fairmq/FairMQMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class FairMQMessage
FairMQMessage(FairMQTransportFactory* factory) : fTransport(factory) {}

virtual void Rebuild() = 0;
virtual void Rebuild(fair::mq::Alignment alignment) = 0;
virtual void Rebuild(const size_t size) = 0;
virtual void Rebuild(const size_t size, fair::mq::Alignment alignment) = 0;
virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0;

virtual void* GetData() const = 0;
Expand Down
3 changes: 2 additions & 1 deletion fairmq/FairMQSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ namespace fair
namespace mq
{

enum class TransferResult : int
enum class TransferCode : int
{
success = 0,
error = -1,
timeout = -2,
interrupted = -3
Expand Down
4 changes: 0 additions & 4 deletions fairmq/devices/FairMQBenchmarkSampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "FairMQDevice.h"
#include "tools/RateLimit.h"

#include <atomic>
#include <chrono>
#include <cstddef> // size_t
#include <cstdint> // uint64_t
Expand Down Expand Up @@ -106,15 +105,12 @@ class FairMQBenchmarkSampler : public FairMQDevice
LOG(info) << "Done " << fNumIterations << " iterations in " << std::chrono::duration<double, std::milli>(tEnd - tStart).count() << "ms.";
}

virtual ~FairMQBenchmarkSampler() {}

protected:
bool fMultipart;
bool fMemSet;
size_t fNumParts;
size_t fMsgSize;
size_t fMsgAlignment;
std::atomic<int> fMsgCounter;
float fMsgRate;
uint64_t fNumIterations;
uint64_t fMaxIterations;
Expand Down
Loading