Skip to content

Commit

Permalink
feat: refine rpc config
Browse files Browse the repository at this point in the history
  • Loading branch information
shuai132 committed Sep 18, 2023
1 parent 1aac548 commit 0331722
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 65 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ jobs:
cd build
./asio_net_test_rpc
- name: Test RPC 2
run: |
cd build
./asio_net_test_rpc_2
- name: Test RPC (reconnect)
run: |
cd build
Expand Down
38 changes: 16 additions & 22 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ cmake_minimum_required(VERSION 3.5)

project(asio_net CXX)

option(ASIO_NET_ENABLE_RPC "" ON)
option(ASIO_NET_ENABLE_SSL "" OFF)

option(ASIO_NET_BUILD_TEST "" OFF)

option(ASIO_NET_DISABLE_ON_DATA_PRINT "" OFF)

if (CMAKE_CURRENT_SOURCE_DIR STREQUAL CMAKE_SOURCE_DIR)
Expand All @@ -17,17 +14,15 @@ set(CMAKE_CXX_STANDARD 14)
add_compile_options(-Wall)

add_library(${PROJECT_NAME} INTERFACE)
target_include_directories(${PROJECT_NAME} INTERFACE include)
target_include_directories(${PROJECT_NAME} INTERFACE
include
include/asio_net/rpc_core/include
)

if (ASIO_NET_ENABLE_SSL)
target_compile_definitions(${PROJECT_NAME} INTERFACE -DASIO_NET_ENABLE_SSL)
endif ()

if (ASIO_NET_ENABLE_RPC)
target_include_directories(${PROJECT_NAME} INTERFACE
include/asio_net/rpc_core/include
)
endif ()

if (ASIO_NET_ENABLE_SSL)
find_package(OpenSSL 1.1.0 REQUIRED)
target_link_libraries(${PROJECT_NAME} INTERFACE OpenSSL::SSL)
Expand Down Expand Up @@ -65,17 +60,16 @@ if (ASIO_NET_BUILD_TEST)
add_executable(${PROJECT_NAME}_test_tcp_ssl_s test/tcp_ssl_s.cpp)
endif ()

if (ASIO_NET_ENABLE_RPC)
add_compile_definitions(RPC_CORE_LOG_SHOW_DEBUG)
add_compile_definitions(ASIO_NET_LOG_SHOW_DEBUG)
add_executable(${PROJECT_NAME}_test_rpc test/rpc.cpp)
add_executable(${PROJECT_NAME}_test_rpc_reconnect test/rpc_reconnect.cpp)
add_executable(${PROJECT_NAME}_test_rpc_c_open_close test/rpc_c_open_close.cpp)
add_executable(${PROJECT_NAME}_test_rpc_s test/rpc_s.cpp)
add_executable(${PROJECT_NAME}_test_rpc_c test/rpc_c.cpp)
add_executable(${PROJECT_NAME}_test_domain_rpc test/domain_rpc.cpp)
if (ASIO_NET_ENABLE_SSL)
add_executable(${PROJECT_NAME}_test_rpc_ssl test/rpc_ssl.cpp)
endif ()
add_compile_definitions(RPC_CORE_LOG_SHOW_DEBUG)
add_compile_definitions(ASIO_NET_LOG_SHOW_DEBUG)
add_executable(${PROJECT_NAME}_test_rpc test/rpc.cpp)
add_executable(${PROJECT_NAME}_test_rpc_2 test/rpc_2.cpp)
add_executable(${PROJECT_NAME}_test_rpc_reconnect test/rpc_reconnect.cpp)
add_executable(${PROJECT_NAME}_test_rpc_c_open_close test/rpc_c_open_close.cpp)
add_executable(${PROJECT_NAME}_test_rpc_s test/rpc_s.cpp)
add_executable(${PROJECT_NAME}_test_rpc_c test/rpc_c.cpp)
add_executable(${PROJECT_NAME}_test_domain_rpc test/domain_rpc.cpp)
if (ASIO_NET_ENABLE_SSL)
add_executable(${PROJECT_NAME}_test_rpc_ssl test/rpc_ssl.cpp)
endif ()
endif ()
27 changes: 26 additions & 1 deletion include/asio_net/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,25 @@

#include <cstdint>

#include "rpc_core/rpc.hpp"

namespace asio_net {

struct config {
struct rpc_config {
// rpc config
std::shared_ptr<rpc_core::rpc> rpc;

// socket config
bool enable_ipv6 = false;
uint32_t max_body_size = UINT32_MAX;
uint32_t max_send_buffer_size = UINT32_MAX;

// socket option
uint32_t socket_send_buffer_size = UINT32_MAX;
uint32_t socket_recv_buffer_size = UINT32_MAX;
};

struct tcp_config {
bool auto_pack = false;
bool enable_ipv6 = false;
uint32_t max_body_size = UINT32_MAX;
Expand All @@ -14,6 +30,15 @@ struct config {
uint32_t socket_send_buffer_size = UINT32_MAX;
uint32_t socket_recv_buffer_size = UINT32_MAX;

tcp_config() = default;
explicit tcp_config(const rpc_config& c)
: auto_pack(true),
enable_ipv6(c.enable_ipv6),
max_body_size(c.max_body_size),
max_send_buffer_size(c.max_send_buffer_size),
socket_send_buffer_size(c.socket_send_buffer_size),
socket_recv_buffer_size(c.socket_recv_buffer_size) {}

void init() {
// when auto_pack disable, max_body_size means buffer size, default is 1024 bytes
if ((!auto_pack) && (max_body_size == UINT32_MAX)) {
Expand Down
17 changes: 7 additions & 10 deletions include/asio_net/detail/rpc_client_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ namespace detail {
template <socket_type T>
class rpc_client_t : noncopyable {
public:
explicit rpc_client_t(asio::io_context& io_context, uint32_t max_body_size = UINT32_MAX)
: io_context_(io_context),
client_(
std::make_shared<detail::tcp_client_t<T>>(io_context, config{.auto_pack = true, .enable_ipv6 = true, .max_body_size = max_body_size})) {
explicit rpc_client_t(asio::io_context& io_context, rpc_config rpc_config = {})
: io_context_(io_context), rpc_config_(rpc_config), client_(std::make_shared<detail::tcp_client_t<T>>(io_context, tcp_config(rpc_config))) {
client_->on_open = [this]() {
auto session = std::make_shared<rpc_session_t<T>>(io_context_);
auto session = std::make_shared<rpc_session_t<T>>(io_context_, rpc_config_);
session->init(client_);

session->on_close = [this] {
Expand All @@ -36,12 +34,10 @@ class rpc_client_t : noncopyable {
}

#ifdef ASIO_NET_ENABLE_SSL
explicit rpc_client_t(asio::io_context& io_context, asio::ssl::context& ssl_context, uint32_t max_body_size = UINT32_MAX)
: io_context_(io_context),
client_(std::make_shared<detail::tcp_client_t<T>>(io_context, ssl_context,
config{.auto_pack = true, .enable_ipv6 = true, .max_body_size = max_body_size})) {
explicit rpc_client_t(asio::io_context& io_context, asio::ssl::context& ssl_context, rpc_config rpc_config = {})
: io_context_(io_context), client_(std::make_shared<detail::tcp_client_t<T>>(io_context, ssl_context, tcp_config(rpc_config))) {
client_->on_open = [this]() {
auto session = std::make_shared<rpc_session_t<T>>(io_context_);
auto session = std::make_shared<rpc_session_t<T>>(io_context_, rpc_config_);
session->init(client_);

session->on_close = [this] {
Expand Down Expand Up @@ -96,6 +92,7 @@ class rpc_client_t : noncopyable {

private:
asio::io_context& io_context_;
rpc_config rpc_config_;
std::shared_ptr<detail::tcp_client_t<T>> client_;
};

Expand Down
22 changes: 10 additions & 12 deletions include/asio_net/detail/rpc_server_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ namespace detail {
template <socket_type T>
class rpc_server_t : noncopyable {
public:
rpc_server_t(asio::io_context& io_context, uint16_t port, bool enable_ipv6 = false, uint32_t max_body_size = UINT32_MAX)
: io_context_(io_context), server_(io_context, port, config{.auto_pack = true, .enable_ipv6 = enable_ipv6, .max_body_size = max_body_size}) {
rpc_server_t(asio::io_context& io_context, uint16_t port, rpc_config rpc_config = {})
: io_context_(io_context), rpc_config_(rpc_config), server_(io_context, port, tcp_config(rpc_config)) {
static_assert(T == detail::socket_type::normal, "");
server_.on_session = [this](std::weak_ptr<detail::tcp_session_t<T>> ws) {
auto session = std::make_shared<rpc_session_t<T>>(io_context_);
auto session = std::make_shared<rpc_session_t<T>>(io_context_, rpc_config_);
session->init(std::move(ws));
if (on_session) {
on_session(session);
Expand All @@ -25,13 +25,11 @@ class rpc_server_t : noncopyable {
}

#ifdef ASIO_NET_ENABLE_SSL
rpc_server_t(asio::io_context& io_context, uint16_t port, asio::ssl::context& ssl_context, bool enable_ipv6 = false,
uint32_t max_body_size = UINT32_MAX)
: io_context_(io_context),
server_(io_context, port, ssl_context, config{.auto_pack = true, .enable_ipv6 = enable_ipv6, .max_body_size = max_body_size}) {
rpc_server_t(asio::io_context& io_context, uint16_t port, asio::ssl::context& ssl_context, rpc_config rpc_config = {})
: io_context_(io_context), rpc_config_(rpc_config), server_(io_context, port, ssl_context, tcp_config(rpc_config)) {
static_assert(T == detail::socket_type::ssl, "");
server_.on_session = [this](std::weak_ptr<detail::tcp_session_t<T>> ws) {
auto session = std::make_shared<rpc_session_t<T>>(io_context_);
auto session = std::make_shared<rpc_session_t<T>>(io_context_, rpc_config_);
session->init(std::move(ws));
if (on_session) {
on_session(session);
Expand All @@ -43,12 +41,11 @@ class rpc_server_t : noncopyable {
}
#endif

rpc_server_t(asio::io_context& io_context, const std::string& endpoint, bool enable_ipv6 = false, uint32_t max_body_size = UINT32_MAX)
: io_context_(io_context),
server_(io_context, endpoint, config{.auto_pack = true, .enable_ipv6 = enable_ipv6, .max_body_size = max_body_size}) {
rpc_server_t(asio::io_context& io_context, const std::string& endpoint, rpc_config rpc_config = {})
: io_context_(io_context), rpc_config_(rpc_config), server_(io_context, endpoint, tcp_config(rpc_config)) {
static_assert(T == detail::socket_type::domain, "");
server_.on_session = [this](std::weak_ptr<detail::tcp_session_t<T>> ws) {
auto session = std::make_shared<rpc_session_t<T>>(io_context_);
auto session = std::make_shared<rpc_session_t<T>>(io_context_, rpc_config_);
session->init(std::move(ws));
if (on_session) {
on_session(session);
Expand All @@ -67,6 +64,7 @@ class rpc_server_t : noncopyable {

private:
asio::io_context& io_context_;
rpc_config rpc_config_;
detail::tcp_server_t<T> server_;
};

Expand Down
9 changes: 7 additions & 2 deletions include/asio_net/detail/rpc_session_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace detail {
template <socket_type T>
class rpc_session_t : noncopyable, public std::enable_shared_from_this<rpc_session_t<T>> {
public:
explicit rpc_session_t(asio::io_context& io_context) : io_context_(io_context) {
explicit rpc_session_t(asio::io_context& io_context, rpc_config& rpc_config) : io_context_(io_context), rpc_config_(rpc_config) {
ASIO_NET_LOGD("rpc_session: %p", this);
}

Expand All @@ -25,7 +25,11 @@ class rpc_session_t : noncopyable, public std::enable_shared_from_this<rpc_sessi
tcp_session_ = std::move(ws);
auto tcp_session = tcp_session_.lock();

rpc = rpc_core::rpc::create();
if (rpc_config_.rpc) {
rpc = rpc_config_.rpc;
} else {
rpc = rpc_core::rpc::create();
}

rpc->set_timer([this](uint32_t ms, rpc_core::rpc::timeout_cb cb) {
auto timer = std::make_shared<asio::steady_timer>(io_context_);
Expand Down Expand Up @@ -76,6 +80,7 @@ class rpc_session_t : noncopyable, public std::enable_shared_from_this<rpc_sessi

private:
asio::io_context& io_context_;
rpc_config& rpc_config_;
std::weak_ptr<detail::tcp_channel_t<T>> tcp_session_;
};

Expand Down
4 changes: 2 additions & 2 deletions include/asio_net/detail/tcp_channel_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace detail {
template <socket_type T>
class tcp_channel_t : private noncopyable {
public:
tcp_channel_t(typename socket_impl<T>::socket& socket, const config& config) : socket_(socket), config_(config) {
tcp_channel_t(typename socket_impl<T>::socket& socket, const tcp_config& config) : socket_(socket), config_(config) {
ASIO_NET_LOGD("tcp_channel: %p", this);
}

Expand Down Expand Up @@ -205,7 +205,7 @@ class tcp_channel_t : private noncopyable {

private:
typename socket_impl<T>::socket& socket_;
const config& config_;
const tcp_config& config_;
detail::message read_msg_;
uint32_t send_buffer_now_ = 0;
std::deque<std::string> write_msg_queue_;
Expand Down
6 changes: 3 additions & 3 deletions include/asio_net/detail/tcp_client_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ namespace detail {
template <socket_type T>
class tcp_client_t : public tcp_channel_t<T> {
public:
explicit tcp_client_t(asio::io_context& io_context, config config = {})
explicit tcp_client_t(asio::io_context& io_context, tcp_config config = {})
: tcp_channel_t<T>(socket_, config_), io_context_(io_context), socket_(io_context), config_(config) {
config_.init();
}

#ifdef ASIO_NET_ENABLE_SSL
explicit tcp_client_t(asio::io_context& io_context, asio::ssl::context& ssl_context, config config = {})
explicit tcp_client_t(asio::io_context& io_context, asio::ssl::context& ssl_context, tcp_config config = {})
: tcp_channel_t<T>(socket_, config_), io_context_(io_context), socket_(io_context, ssl_context), config_(config) {
config_.init();
}
Expand Down Expand Up @@ -126,7 +126,7 @@ class tcp_client_t : public tcp_channel_t<T> {
private:
asio::io_context& io_context_;
typename socket_impl<T>::socket socket_;
config config_;
tcp_config config_;
std::unique_ptr<asio::steady_timer> reconnect_timer_;
uint32_t reconnect_ms_ = 0;
std::function<void()> open_;
Expand Down
10 changes: 5 additions & 5 deletions include/asio_net/detail/tcp_server_t.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class tcp_session_t : public tcp_channel_t<T>, public std::enable_shared_from_th
using socket = typename socket_impl<T>::socket;

public:
explicit tcp_session_t(socket socket, const config& config) : tcp_channel_t<T>(socket_, config), socket_(std::move(socket)) {
explicit tcp_session_t(socket socket, const tcp_config& config) : tcp_channel_t<T>(socket_, config), socket_(std::move(socket)) {
this->init_socket();
}

Expand Down Expand Up @@ -43,15 +43,15 @@ class tcp_server_t {
using endpoint = typename socket_impl<T>::endpoint;

public:
tcp_server_t(asio::io_context& io_context, uint16_t port, config config = {})
tcp_server_t(asio::io_context& io_context, uint16_t port, tcp_config config = {})
: io_context_(io_context),
acceptor_(io_context, endpoint(config.enable_ipv6 ? asio::ip::tcp::v6() : asio::ip::tcp::v4(), port)),
config_(config) {
config_.init();
}

#ifdef ASIO_NET_ENABLE_SSL
tcp_server_t(asio::io_context& io_context, uint16_t port, asio::ssl::context& ssl_context, config config = {})
tcp_server_t(asio::io_context& io_context, uint16_t port, asio::ssl::context& ssl_context, tcp_config config = {})
: io_context_(io_context), ssl_context_(ssl_context), acceptor_(io_context, endpoint(asio::ip::tcp::v4(), port)), config_(config) {
config_.init();
}
Expand All @@ -64,7 +64,7 @@ class tcp_server_t {
* @param endpoint e.g. /tmp/foobar
* @param config
*/
tcp_server_t(asio::io_context& io_context, const std::string& endpoint, config config = {})
tcp_server_t(asio::io_context& io_context, const std::string& endpoint, tcp_config config = {})
: io_context_(io_context), acceptor_(io_context, typename socket_impl<T>::endpoint(endpoint)), config_(config) {
config_.init();
}
Expand All @@ -91,7 +91,7 @@ class tcp_server_t {
typename std::conditional<T == socket_type::ssl, asio::ssl::context&, uint8_t>::type ssl_context_;
#endif
typename socket_impl<T>::acceptor acceptor_;
config config_;
tcp_config config_;
};

template <>
Expand Down
4 changes: 2 additions & 2 deletions test/domain_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ int main(int argc, char** argv) {
static std::atomic_bool pass_flag_client_close{false};
std::thread([] {
asio::io_context context;
domain_tcp_server server(context, ENDPOINT, config{.auto_pack = true});
domain_tcp_server server(context, ENDPOINT, tcp_config{.auto_pack = true});
server.on_session = [](const std::weak_ptr<domain_tcp_session>& ws) {
LOG("on_session:");
auto session = ws.lock();
Expand All @@ -49,7 +49,7 @@ int main(int argc, char** argv) {

std::thread([] {
asio::io_context context;
domain_tcp_client client(context, config{.auto_pack = true});
domain_tcp_client client(context, tcp_config{.auto_pack = true});
client.on_open = [&] {
LOG("client on_open:");
for (uint32_t i = 0; i < test_count_max; ++i) {
Expand Down
Loading

0 comments on commit 0331722

Please sign in to comment.