Skip to content

Commit

Permalink
Merge pull request #36 from solidoss/relay_multicast
Browse files Browse the repository at this point in the history
Relay multicast - preparing the stage + threadpool improvements
  • Loading branch information
vipalade authored Jun 29, 2024
2 parents ac26b55 + 0b9d7b8 commit fc163bc
Show file tree
Hide file tree
Showing 106 changed files with 1,673 additions and 1,093 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.13 FATAL_ERROR)
#-----------------------------------------------------------------
# The project
#-----------------------------------------------------------------
project (SolidFrame VERSION 11.1)
project (SolidFrame VERSION 12.0)

message("SolidFrame version: ${PROJECT_VERSION} - ${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}")
#-----------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# SolidFrame Releases

## Version 12.0
* utility: support pthread_spin_lock when available
* utility: ThreadPool improvements and fixes
* frame: Reactor and aio::Reactor using the same new technique from ThreadPool for event passing
* mprpc: preparing the stage for relay multicast support

## Version 11.1
* mprpc: Split Connection in ClientConnection, ServerConnection, RelayConnection
* mprpc: Some cleanup and some small improvements on the MessageReader and MessageWriter
Expand Down
4 changes: 4 additions & 0 deletions cmake/check.config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,9 @@ file (READ "${CMAKE_CURRENT_SOURCE_DIR}/cmake/check/epoll2.cpp" source_code)

CHECK_CXX_SOURCE_RUNS("${source_code}" SOLID_USE_EPOLL2)

file (READ "${CMAKE_CURRENT_SOURCE_DIR}/cmake/check/pthread_spinlock.cpp" source_code)

CHECK_CXX_SOURCE_COMPILES("${source_code}" SOLID_USE_PTHREAD_SPINLOCK)

#TODO:
#set(SOLID_FRAME_AIO_REACTOR_USE_SPINLOCK TRUE)
8 changes: 8 additions & 0 deletions cmake/check/pthread_spinlock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#include <pthread.h>

int main(){
pthread_spinlock_t spl;
auto rv = pthread_spin_init(&spl, PTHREAD_PROCESS_PRIVATE);
if(rv != 0) return -1;
return 0;
}
2 changes: 1 addition & 1 deletion examples/frame/aio_echo/example_echo_auto_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ int main(int argc, char* argv[])
1024 * 1024 * 64);
}

CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });

async_resolver(&resolver);
Expand Down
2 changes: 1 addition & 1 deletion examples/frame/aio_echo/example_secure_echo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ int main(int argc, char* argv[])
frame::ServiceT service(manager);
frame::ActorIdT actuid;

CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });
ErrorConditionT err;

Expand Down
6 changes: 3 additions & 3 deletions examples/frame/mprpc_echo/example_mprpc_echo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ int main(int argc, char* argv[])
frame::Manager m;
frame::mprpc::ServiceT ipcsvc(m);
ErrorConditionT err;
CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });

if (!restart(ipcsvc, resolver, sch)) {
Expand Down Expand Up @@ -211,7 +211,7 @@ bool restart(
reflection::v1::metadata::factory,
[&](auto& _rmap) {
_rmap.template registerMessage<FirstMessage>(1, "FirstMessage",
[](
[](
frame::mprpc::ConnectionContext& _rctx,
frame::mprpc::MessagePointerT<FirstMessage>& _rsend_msg,
frame::mprpc::MessagePointerT<FirstMessage>& _rrecv_msg,
Expand Down Expand Up @@ -347,7 +347,7 @@ void broadcast_message(frame::mprpc::Service& _rsvc, frame::mprpc::MessagePointe
solid_log(generic_logger, Verbose, "done stop===============================");

for (Params::StringVectorT::const_iterator it(params.connectstringvec.begin()); it != params.connectstringvec.end(); ++it) {
_rsvc.sendMessage(it->c_str(), _rmsgptr, {frame::mprpc::MessageFlagsE::AwaitResponse});
_rsvc.sendMessage({*it}, _rmsgptr, {frame::mprpc::MessageFlagsE::AwaitResponse});
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/frame/relay_server/example_relay_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ int main(int argc, char* argv[])
3,
1024 * 1024 * 64);
}
CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });

async_resolver(&resolver);
Expand Down
2 changes: 1 addition & 1 deletion examples/frame/relay_server/example_relay_server_bi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ int main(int argc, char* argv[])
1024 * 1024 * 64);
}

CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });

async_resolver(&resolver);
Expand Down
2 changes: 1 addition & 1 deletion examples/frame/relay_server/example_relay_server_bi_cp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ int main(int argc, char* argv[])
3,
1024 * 1024 * 64);
}
CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });

async_resolver(&resolver);
Expand Down
2 changes: 1 addition & 1 deletion examples/frame/relay_server/example_relay_server_bi_ex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ int main(int argc, char* argv[])

cout << "sizeof(Connection) = " << sizeof(Connection) << endl;

CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });

async_resolver(&resolver);
Expand Down
2 changes: 1 addition & 1 deletion examples/frame/relay_server/example_relay_server_bi_sh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ int main(int argc, char* argv[])

cout << "sizeof(Connection) = " << sizeof(Connection) << endl;

CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });

async_resolver(&resolver);
Expand Down
2 changes: 1 addition & 1 deletion examples/utility/threadpool/example_file_open_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ int main(int argc, char* argv[])
using ThreadPoolT = ThreadPool<FileDevice*, size_t>;
Context context;
ThreadPoolT wp{
1, 100, 0, [](const size_t, Context&) {}, [](const size_t, Context&) {},
{1, 100, 0}, [](const size_t, Context&) {}, [](const size_t, Context&) {},
[](FileDevice* _pfile, Context& _rctx) {
int64_t sz = _pfile->size();
int toread;
Expand Down
2 changes: 1 addition & 1 deletion examples/utility/threadpool/example_threadpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ int main(int argc, char* argv[])
solid::log_start(std::cerr, {".*:VIEW"});

ThreadPool<int, size_t> wp{
1, 100, 0, [](const size_t) {}, [](const size_t) {},
{1, 100, 0}, [](const size_t) {}, [](const size_t) {},
[](int _v) {
solid_log(generic_logger, Info, "v = " << _v);
std::this_thread::sleep_for(std::chrono::milliseconds(_v * 10));
Expand Down
35 changes: 19 additions & 16 deletions solid/frame/aio/aioreactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#pragma once

#include <tuple>

#include "solid/frame/aio/aiocommon.hpp"
#include "solid/frame/aio/aioreactorcontext.hpp"
#include "solid/frame/common.hpp"
Expand Down Expand Up @@ -140,9 +142,10 @@ class Reactor : public frame::ReactorBase {
Reactor(SchedulerBase& _rsched, StatisticT& _rstatistic, const size_t _schedidx, const size_t _wake_capacity);
~Reactor();

size_t pushWakeIndex() noexcept
std::tuple<frame::impl::AtomicIndexValueT, frame::impl::AtomicCounterValueT> pushWakeIndex() noexcept
{
return push_wake_index_.fetch_add(1) % wake_capacity_;
const auto index = push_wake_index_.fetch_add(1);
return {index % wake_capacity_, frame::impl::computeCounter(index, wake_capacity_)};
}

template <typename Function>
Expand Down Expand Up @@ -374,7 +377,7 @@ class Reactor : public impl::Reactor {

Reactor(SchedulerBase& _rsched, StatisticT& _rstatistic, const size_t _sched_idx, const size_t _wake_capacity)
: impl::Reactor(_rsched, _rstatistic, _sched_idx, _wake_capacity)
, wake_arr_(new WakeStubT[_wake_capacity])
, wake_arr_(new WakeStubT[wake_capacity_])
{
}

Expand All @@ -385,10 +388,10 @@ class Reactor : public impl::Reactor {
mutex().lock();
const UniqueId uid = this->popUid(*_ract);
mutex().unlock();
const auto index = pushWakeIndex();
auto& rstub = wake_arr_[index];
const auto [index, count] = pushWakeIndex();
auto& rstub = wake_arr_[index];

rstub.waitWhilePush(rstatistic_);
rstub.waitWhilePush(rstatistic_, count);

rstub.reset(uid, _revent, std::move(_ract), &_rsvc);

Expand All @@ -414,10 +417,10 @@ class Reactor : public impl::Reactor {
mutex().lock();
const UniqueId uid = this->popUid(*_ract);
mutex().unlock();
const auto index = pushWakeIndex();
auto& rstub = wake_arr_[index];
const auto [index, count] = pushWakeIndex();
auto& rstub = wake_arr_[index];

rstub.waitWhilePush(rstatistic_);
rstub.waitWhilePush(rstatistic_, count);

rstub.reset(uid, std::move(_revent), std::move(_ract), &_rsvc);

Expand All @@ -442,10 +445,10 @@ class Reactor : public impl::Reactor {
{
bool notify = false;
{
const auto index = pushWakeIndex();
auto& rstub = wake_arr_[index];
const auto [index, count] = pushWakeIndex();
auto& rstub = wake_arr_[index];

rstub.waitWhilePush(rstatistic_);
rstub.waitWhilePush(rstatistic_, count);

rstub.reset(_ractuid, _revent);

Expand All @@ -468,10 +471,10 @@ class Reactor : public impl::Reactor {
{
bool notify = false;
{
const auto index = pushWakeIndex();
auto& rstub = wake_arr_[index];
const auto [index, count] = pushWakeIndex();
auto& rstub = wake_arr_[index];

rstub.waitWhilePush(rstatistic_);
rstub.waitWhilePush(rstatistic_, count);

rstub.reset(_ractuid, std::move(_revent));

Expand Down Expand Up @@ -533,7 +536,7 @@ class Reactor : public impl::Reactor {
while (true) {
const size_t index = pop_wake_index_ % wake_capacity_;
auto& rstub = wake_arr_[index];
if (rstub.isFilled()) {
if (rstub.isFilled(pop_wake_index_, wake_capacity_)) {
if (rstub.actor_ptr_) [[unlikely]] {
++actor_count_;
rstatistic_.actorCount(actor_count_);
Expand Down
2 changes: 1 addition & 1 deletion solid/frame/aio/src/aioreactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ Reactor::Reactor(
SchedulerBase& _rsched, StatisticT& _rstatistic,
const size_t _idx, const size_t _wake_capacity)
: ReactorBase(_rsched, _idx)
, wake_capacity_(_wake_capacity)
, wake_capacity_(std::bit_ceil(_wake_capacity))
, rstatistic_(_rstatistic)
{
solid_log(logger, Verbose, "");
Expand Down
8 changes: 3 additions & 5 deletions solid/frame/aio/test/test_echo_tcp_stress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ int test_echo_tcp_stress(int argc, char* argv[])
frame::Manager srv_mgr;
SecureContextT srv_secure_ctx{SecureContextT::create()};
frame::ServiceT srv_svc{srv_mgr};
CallPoolT cwp{1, 100, 0, [](const size_t) {}, [](const size_t) {}};
CallPoolT cwp{{1, 100, 0}, [](const size_t) {}, [](const size_t) {}};
frame::aio::Resolver resolver([&cwp](std::function<void()>&& _fnc) { cwp.pushOne(std::move(_fnc)); });

async_resolver(&resolver);
Expand Down Expand Up @@ -695,8 +695,7 @@ void Listener::onAccept(frame::aio::ReactorContext& _rctx, SocketDevice& _rsd)
break;
}
--repeatcnt;
} while (repeatcnt != 0u && sock.accept(
_rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd));
} while (repeatcnt != 0u && sock.accept(_rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd));

if (repeatcnt == 0u) {
sock.postAccept(
Expand Down Expand Up @@ -1004,8 +1003,7 @@ void Listener::onAccept(frame::aio::ReactorContext& _rctx, SocketDevice& _rsd)
break;
}
--repeatcnt;
} while (repeatcnt != 0u && sock.accept(
_rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd));
} while (repeatcnt != 0u && sock.accept(_rctx, [this](frame::aio::ReactorContext& _rctx, SocketDevice& _rsd) { onAccept(_rctx, _rsd); }, _rsd));

if (repeatcnt == 0u) {
sock.postAccept(
Expand Down
6 changes: 3 additions & 3 deletions solid/frame/aio/test/test_event_stress_wp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ int test_event_stress_wp(int argc, char* argv[])
gctx.stopping_ = false;

account_cp.start(
thread_count, account_count, 0, [](const size_t, AccountContext&) {}, [](const size_t, AccountContext&) {}, std::ref(acc_ctx));
{thread_count, account_count, 0}, [](const size_t, AccountContext&) {}, [](const size_t, AccountContext&) {}, std::ref(acc_ctx));
connection_cp.start(
thread_count, account_count * account_connection_count, 0, [](const size_t, ConnectionContext&) {}, [](const size_t, ConnectionContext&) {}, std::ref(conn_ctx));
{thread_count, account_count * account_connection_count, 0}, [](const size_t, ConnectionContext&) {}, [](const size_t, ConnectionContext&) {}, std::ref(conn_ctx));
device_cp.start(
thread_count, account_count * account_connection_count, 0, [](const size_t, DeviceContext&) {}, [](const size_t, DeviceContext&) {}, std::ref(dev_ctx));
{thread_count, account_count * account_connection_count, 0}, [](const size_t, DeviceContext&) {}, [](const size_t, DeviceContext&) {}, std::ref(dev_ctx));

conn_ctx.conn_cnt_ = (account_connection_count * account_count);
auto produce_lambda = [&]() {
Expand Down
2 changes: 1 addition & 1 deletion solid/frame/aio/test/test_perf_threadpool_lockfree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ int test_perf_threadpool_lockfree(int argc, char* argv[])
(void)context_count;
auto lambda = [&]() {
ThreadPoolT wp{
thread_count, 10000, 0, [](const size_t) {}, [](const size_t) {},
{thread_count, 10000, 0}, [](const size_t) {}, [](const size_t) {},
[&](EventBase& _event) {
if (_event == generic_event<GenericEventE::Wake>) {
++received_events;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ int test_perf_threadpool_synch_context(int argc, char* argv[])
auto lambda = [&]() {
auto start = std::chrono::steady_clock::now();
ThreadPoolT wp{
thread_count, 10000, 0, [](const size_t) {}, [](const size_t) {},
{thread_count, 10000, 0}, [](const size_t) {}, [](const size_t) {},
[&](EventBase& _event) {
if (_event == generic_event<GenericEventE::Wake>) {
++received_events;
Expand Down
Loading

0 comments on commit fc163bc

Please sign in to comment.