Skip to content

Commit

Permalink
IntrusivePtr with collapse and other improvements (#33)
Browse files Browse the repository at this point in the history
* relay: keep working on the SharedBuffer with support for thread local pool

* relay: small changes related to sharedbuffer

* relay: mprpc switch to SharedBuffer and changed SharedBuffer to return the requested capacity

* relay: fix resurrect test

* relay: add support for shared_ptr collapse

* relay: improved reactors and remove comment out collapse because of race condition

* relay: mprpc - preparing support for different MessagePointerT

* relay: introducing intrusiveptr and make it default alternative for mprpc::Message

* relay: test_collapse

* relay: fix build on macos

* relay: fix build on windows

* VERSION 11.0

* relay: fixing test_collapse
  • Loading branch information
vipalade authored Jan 6, 2024
1 parent 1408519 commit 1832c17
Show file tree
Hide file tree
Showing 128 changed files with 2,654 additions and 1,415 deletions.
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.13 FATAL_ERROR)
#-----------------------------------------------------------------
# The project
#-----------------------------------------------------------------
project (SolidFrame VERSION 10.0)
project (SolidFrame VERSION 11.0)

message("SolidFrame version: ${PROJECT_VERSION} - ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}")
#-----------------------------------------------------------------
Expand Down Expand Up @@ -126,6 +126,10 @@ set(SOLID_VERSION_PATCH ${PROJECT_VERSION_PATCH})

set(SYSTEM_BASIC_LIBRARIES "")


option(SOLID_FRAME_AIO_REACTOR_USE_SPINLOCK "Use SpinLock on AIO Reactor" ON)
option(SOLID_MPRPC_USE_SHARED_PTR_MESSAGE "Use std::shared_ptr with mprpc::Message" OFF)

#-----------------------------------------------------------------
# Per OS configuration
#-----------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Boost Software License - Version 1.0 - August 17th, 2003
* _InnerList_ - bidirectional list mapped over a vector/deque
* _Stack_ - alternative to std::stack
* _Queue_ - alternative to std:queue
* _WorkPool_ - generic thread pool
* _ThreadPool_ - generic thread pool
* [__solid_serialization_v2__](#solid_serialization_v2): binary serialization/marshalling
* _TypeMap_
* _binary::Serializer_
Expand Down
12 changes: 10 additions & 2 deletions RELEASES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# SolidFrame Releases

## Version 11.0

* utility: Improve SharedBuffer with local caching
* mprpc: switch to using SharedBuffer
* frame: Some improvements to Reactor and aio::Reactor
* utility: Introducing IntrusivePtr with support for collapse
* mprpc: Defaulting to InstrusivePtr for solid::frame::mprpc::Message. Use SOLID_MPRPC_USE_SHARED_PTR_MESSAGE build option to switch to std::shared_ptr.

## Version 10.0

* (DONE) New Pimpl implementation which avoids the extra memory allocation similarly to solid::Any<>
Expand Down Expand Up @@ -31,15 +39,15 @@
* (DONE) remove boost dependency
* (DONE) Object -> Actor
* (DONE) Overal fixes
* (DONE) Refactored solid::WorkPool<>. solid::CallPool<>
* (DONE) Refactored solid::ThreadPool<>. solid::CallPool<>

## Version 5.0

* (DONE) add support for endianess on seralization/v2
* (DONE) clang tidy support
* (DONE) fix compilation on g++ 8.1.1
* (DONE) system/debug.hpp -> system/log.hpp - redesign debug logging engine. No locking while handling log line parameters.
* (DONE) utility/workpool.hpp -> improved locking for a better performance on macOS
* (DONE) utility/ThreadPool.hpp -> improved locking for a better performance on macOS
* (DONE) mpipc: call connection pool close callback after calling connection close callback for every connection in the pool
* (DONE) mpipc: improve connection pool with support for events like ConnectionActivated, PoolDisconnect, ConnectionStop

Expand Down
2 changes: 1 addition & 1 deletion cmake/check.config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ file (READ "${CMAKE_CURRENT_SOURCE_DIR}/cmake/check/epoll2.cpp" source_code)
CHECK_CXX_SOURCE_RUNS("${source_code}" SOLID_USE_EPOLL2)

#TODO:
set(SOLID_FRAME_AIO_REACTOR_USE_SPINLOCK TRUE)
#set(SOLID_FRAME_AIO_REACTOR_USE_SPINLOCK TRUE)
3 changes: 3 additions & 0 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ print_usage(){
echo "7) create a debug Ninja build for C++17"
echo "./configure -b debug -e ../external -f debug -g Ninja -P \"-DCMAKE_CXX_STANDARD=17\""
echo
echo "8) create release build with using std::shared_ptr for solid::frame::mprpc::Message"
echo "./configure -f release_sp -e ../external/ -g Ninja -P \"-DSOLID_MPRPC_USE_SHARED_PTR_MESSAGE=on\""
echo
exit
}

Expand Down
14 changes: 7 additions & 7 deletions examples/frame/mprpc_echo/example_mprpc_echo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ mutex mtx;
condition_variable cnd;
Params params;

void broadcast_message(frame::mprpc::Service& _rsvc, std::shared_ptr<frame::mprpc::Message>& _rmsgptr);
void broadcast_message(frame::mprpc::Service& _rsvc, frame::mprpc::MessagePointerT<>& _rmsgptr);
} // namespace

struct FirstMessage : frame::mprpc::Message {
Expand Down Expand Up @@ -191,7 +191,7 @@ int main(int argc, char* argv[])
return 1;
}
} else {
std::shared_ptr<frame::mprpc::Message> msgptr(new FirstMessage(s));
frame::mprpc::MessagePointerT<> msgptr = frame::mprpc::make_message<FirstMessage>(s);
broadcast_message(ipcsvc, msgptr);
}
} while (s.size());
Expand All @@ -212,10 +212,10 @@ bool restart(
[&](auto& _rmap) {
_rmap.template registerMessage<FirstMessage>(1, "FirstMessage",
[](
frame::mprpc::ConnectionContext& _rctx,
std::shared_ptr<FirstMessage>& _rsend_msg,
std::shared_ptr<FirstMessage>& _rrecv_msg,
ErrorConditionT const& _rerr) {
frame::mprpc::ConnectionContext& _rctx,
frame::mprpc::MessagePointerT<FirstMessage>& _rsend_msg,
frame::mprpc::MessagePointerT<FirstMessage>& _rrecv_msg,
ErrorConditionT const& _rerr) {
if (_rrecv_msg) {
solid_log(generic_logger, Info, _rctx.recipientId() << " Message received: is_on_sender: " << _rrecv_msg->isOnSender() << ", is_on_peer: " << _rrecv_msg->isOnPeer() << ", is_back_on_sender: " << _rrecv_msg->isBackOnSender());
if (_rrecv_msg->isOnPeer()) {
Expand Down Expand Up @@ -341,7 +341,7 @@ std::string loadFile(const char* _path)

namespace {

void broadcast_message(frame::mprpc::Service& _rsvc, std::shared_ptr<frame::mprpc::Message>& _rmsgptr)
void broadcast_message(frame::mprpc::Service& _rsvc, frame::mprpc::MessagePointerT<frame::mprpc::Message>& _rmsgptr)
{

solid_log(generic_logger, Verbose, "done stop===============================");
Expand Down
2 changes: 1 addition & 1 deletion examples/utility/threadpool/example_threadpool.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// example_workpool.cpp
// example_ThreadPool.cpp
//
// Copyright (c) 2007, 2008, 2018 Valentin Palade (vipalade @ gmail . com)
//
Expand Down
16 changes: 8 additions & 8 deletions solid/frame/aio/aioreactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ class Reactor : public frame::ReactorBase {
#else
using MutexT = mutex;
#endif
const size_t wake_capacity_;
ReactorStatistic& rstatistic_;
size_t actor_count_{0};
size_t current_exec_size_{0};
alignas(hardware_destructive_interference_size) std::atomic_size_t pop_wake_index_{0};
alignas(hardware_destructive_interference_size) std::atomic_size_t pending_wake_count_{0};
alignas(hardware_destructive_interference_size) std::atomic_size_t push_wake_index_{0};
const size_t wake_capacity_;
ReactorStatistic& rstatistic_;
size_t actor_count_{0};
size_t current_exec_size_{0};
size_t pop_wake_index_{0};
std::atomic_size_t pending_wake_count_{0};
std::atomic_size_t push_wake_index_{0};

public:
using StatisticT = ReactorStatistic;
Expand Down Expand Up @@ -531,7 +531,7 @@ class Reactor : public impl::Reactor {
ReactorContext ctx(context(_rcrttime));

while (true) {
const size_t index = pop_wake_index_.load(std::memory_order_relaxed) % wake_capacity_;
const size_t index = pop_wake_index_ % wake_capacity_;
auto& rstub = wake_arr_[index];
if (rstub.isFilled()) {
if (rstub.actor_ptr_) [[unlikely]] {
Expand Down
2 changes: 1 addition & 1 deletion solid/frame/aio/src/aioreactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ struct impl::Reactor::Data {
}
}
#elif defined(SOLID_USE_EPOLL)
int computeWaitDuration(NanoTime const& _rcrt, const bool _can_wait) const
int computeWaitDuration(NanoTime const& _rcrt, const bool _can_wait) const
{

if (!_can_wait) {
Expand Down
4 changes: 2 additions & 2 deletions solid/frame/aio/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ if(OPENSSL_FOUND)
add_test(NAME TestPerfActorAio COMMAND test_perf test_perf_actor_aio)
add_test(NAME TestPerfTimeStore COMMAND test_perf test_perf_timestore)
add_test(NAME TestPerfActorFrame COMMAND test_perf test_perf_actor_frame)
add_test(NAME TestPerfWorkPoolLockFree COMMAND test_perf test_perf_threadpool_lockfree)
add_test(NAME TestPerfWorkPoolSynchCtx COMMAND test_perf test_perf_threadpool_synch_context)
add_test(NAME TestPerfThreadPoolLockFree COMMAND test_perf test_perf_threadpool_lockfree)
add_test(NAME TestPerfThreadPoolSynchCtx COMMAND test_perf test_perf_threadpool_synch_context)
endif(OPENSSL_FOUND)

2 changes: 1 addition & 1 deletion solid/frame/aio/test/test_echo_tcp_stress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ class Connection final : public frame::aio::Actor {

int test_echo_tcp_stress(int argc, char* argv[])
{
solid::log_start(std::cerr, {"solid::frame::aio.*:EWX", "\\*:VEWX", "solid::workpool:EWXS"});
solid::log_start(std::cerr, {"solid::frame::aio.*:EWX", "\\*:VEWX", "solid::ThreadPool:EWXS"});

size_t connection_count = 1;

Expand Down
4 changes: 2 additions & 2 deletions solid/frame/aio/test/test_event_stress_wp.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* This test is companion to test_event_stress.
* It tries to simulate the message passing from test_event_stress using Workpool instead of
* It tries to simulate the message passing from test_event_stress using ThreadPool instead of
* actors and schedulers.
*/

Expand Down Expand Up @@ -257,7 +257,7 @@ int test_event_stress_wp(int argc, char* argv[])
solid_log(logger, Warning, "sleep - wait for locked threads");
this_thread::sleep_for(chrono::seconds(100));
solid_log(logger, Warning, "wake - waited for locked threads");
// we must throw here otherwise it will crash because workpool(s) is/are used after destroy
// we must throw here otherwise it will crash because ThreadPool(s) is/are used after destroy
solid_throw(" Test is taking too long - waited " << wait_seconds << " secs");
}
fut.get();
Expand Down
1 change: 1 addition & 0 deletions solid/frame/mprpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ set(Headers
mprpcconfiguration.hpp
mprpccontext.hpp
mprpcerror.hpp
mprpcid.hpp
mprpcmessage.hpp
mprpcprotocol.hpp
mprpcprotocol_serialization_v3.hpp
Expand Down
32 changes: 17 additions & 15 deletions solid/frame/mprpc/mprpcconfiguration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "solid/system/socketaddress.hpp"
#include "solid/system/socketdevice.hpp"
#include "solid/utility/function.hpp"
#include "solid/utility/sharedbuffer.hpp"
#include <vector>

namespace solid {
Expand All @@ -50,7 +51,7 @@ class Configuration;

typedef void (*OnSecureConnectF)(frame::aio::ReactorContext&);
typedef void (*OnSecureAcceptF)(frame::aio::ReactorContext&);

#if false
struct BufferBase {
virtual ~BufferBase();

Expand Down Expand Up @@ -102,6 +103,7 @@ using SendBufferPointerT = std::unique_ptr<char[]>;
using RecvBufferPointerT = std::shared_ptr<BufferBase>;

RecvBufferPointerT make_recv_buffer(const size_t _cp);
#endif

enum struct RelayDataFlagsE : uint8_t {
First,
Expand All @@ -114,7 +116,7 @@ using RelayDataFlagsT = Flags<RelayDataFlagsE>;
std::ostream& operator<<(std::ostream& _ros, const RelayDataFlagsT& _flags);

struct RelayData {
RecvBufferPointerT bufptr_;
SharedBuffer buffer_;
const char* pdata_ = nullptr;
size_t data_size_ = 0;
RelayData* pnext_ = nullptr;
Expand All @@ -126,7 +128,7 @@ struct RelayData {

RelayData(
RelayData&& _rrelmsg) noexcept
: bufptr_(std::move(_rrelmsg.bufptr_))
: buffer_(std::move(_rrelmsg.buffer_))
, pdata_(_rrelmsg.pdata_)
, data_size_(_rrelmsg.data_size_)
, pnext_(nullptr)
Expand All @@ -138,7 +140,7 @@ struct RelayData {

RelayData& operator=(RelayData&& _rrelmsg) noexcept
{
bufptr_ = std::move(_rrelmsg.bufptr_);
buffer_ = std::move(_rrelmsg.buffer_);
pdata_ = _rrelmsg.pdata_;
data_size_ = _rrelmsg.data_size_;
pnext_ = _rrelmsg.pnext_;
Expand All @@ -156,7 +158,7 @@ struct RelayData {
pdata_ = nullptr;
data_size_ = 0;
// connection_id_.clear();
bufptr_.reset();
buffer_.reset();
pnext_ = nullptr;
flags_.reset();
message_flags_ = 0;
Expand Down Expand Up @@ -191,11 +193,11 @@ struct RelayData {
private:
friend class Connection;
RelayData(
RecvBufferPointerT& _bufptr,
const char* _pdata,
size_t _data_size,
const bool _is_last)
: bufptr_(_bufptr)
SharedBuffer& _buffer,
const char* _pdata,
size_t _data_size,
const bool _is_last)
: buffer_(_buffer)
, pdata_(_pdata)
, data_size_(_data_size)
{
Expand All @@ -214,7 +216,7 @@ class RelayEngine {

protected:
using PushFunctionT = solid_function_t(bool(RelayData*&, const MessageId&, MessageId&, bool&));
using DoneFunctionT = solid_function_t(void(RecvBufferPointerT&));
using DoneFunctionT = solid_function_t(void(SharedBuffer&));
using CancelFunctionT = solid_function_t(void(const MessageHeader&));

RelayEngine() {}
Expand Down Expand Up @@ -335,8 +337,8 @@ using ClientSetupSocketDeviceFunctionT = solid_function_t(bool(SocketDe
using ResolveCompleteFunctionT = solid_function_t(void(AddressVectorT&&));
using ConnectionStopFunctionT = solid_function_t(void(ConnectionContext&));
using ConnectionStartFunctionT = solid_function_t(void(ConnectionContext&));
using SendAllocateBufferFunctionT = solid_function_t(SendBufferPointerT(const uint32_t));
using RecvAllocateBufferFunctionT = solid_function_t(RecvBufferPointerT(const uint32_t));
using SendAllocateBufferFunctionT = solid_function_t(SharedBuffer(const uint32_t));
using RecvAllocateBufferFunctionT = solid_function_t(SharedBuffer(const uint32_t));
using CompressFunctionT = solid_function_t(size_t(char*, size_t, ErrorConditionT&));
using UncompressFunctionT = solid_function_t(size_t(char*, const char*, size_t, ErrorConditionT&));
using ExtractRecipientNameFunctionT = solid_function_t(const char*(const char*, std::string&, std::string&));
Expand Down Expand Up @@ -558,9 +560,9 @@ class Configuration {
return !isServer() && isClient();
}

RecvBufferPointerT allocateRecvBuffer(uint8_t& _rbuffer_capacity_kb) const;
SharedBuffer allocateRecvBuffer() const;

SendBufferPointerT allocateSendBuffer(uint8_t& _rbuffer_capacity_kb) const;
SharedBuffer allocateSendBuffer() const;

void check() const;

Expand Down
Loading

0 comments on commit 1832c17

Please sign in to comment.