From 92ab913226ea8fdb3686e1c7001f82dbae65517b Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 25 Nov 2024 17:58:58 +0100 Subject: [PATCH 01/10] Timeout#Timeout(): don't pass yield_context to callback It's not used. Also, the callback shall run completely at once. This ensures that it won't (continue to) run once another coroutine on the strand calls Timeout#Cancel(). --- lib/base/io-engine.hpp | 2 +- lib/base/tlsstream.cpp | 2 +- lib/icingadb/redisconnection.hpp | 2 +- lib/methods/ifwapichecktask.cpp | 2 +- lib/remote/apilistener.cpp | 6 +++--- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 326f04fdc47..83ac9aaabad 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -199,7 +199,7 @@ class Timeout : public SharedObject } auto f (onTimeout); - f(std::move(yc)); + f(); }); } diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index ed80058372a..c7f2884f433 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -141,7 +141,7 @@ void AsioTlsStream::GracefulDisconnect(boost::asio::io_context::strand& strand, { Timeout::Ptr shutdownTimeout(new Timeout(strand.context(), strand, boost::posix_time::seconds(10), - [this](boost::asio::yield_context yc) { + [this] { // Forcefully terminate the connection if async_shutdown() blocked more than 10 seconds. ForceDisconnect(); } diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index fecd236f9b3..04c6635853d 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -520,7 +520,7 @@ Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream) m_Strand.context(), m_Strand, boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)), - [keepAlive, stream](boost::asio::yield_context yc) { + [keepAlive, stream] { boost::system::error_code ec; stream->lowest_layer().cancel(ec); } diff --git a/lib/methods/ifwapichecktask.cpp b/lib/methods/ifwapichecktask.cpp index 9a62444b6c6..d5148623a0c 100644 --- a/lib/methods/ifwapichecktask.cpp +++ b/lib/methods/ifwapichecktask.cpp @@ -457,7 +457,7 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes *strand, [strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) { Timeout::Ptr timeout = new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)), - [&conn, &checkable](boost::asio::yield_context yc) { + [&conn, &checkable] { Log(LogNotice, "IfwApiCheckTask") << "Timeout while checking " << checkable->GetReflectionType()->GetName() << " '" << checkable->GetName() << "', cancelling attempt"; diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 9c2a489da6e..601367e4991 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -535,7 +535,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) { Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), - [sslConn, remoteEndpoint](asio::yield_context yc) { + [sslConn, remoteEndpoint] { Log(LogWarning, "ApiListener") << "Timeout while processing incoming connection from " << remoteEndpoint; @@ -586,7 +586,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) lock.unlock(); Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), - [sslConn, endpoint, host, port](asio::yield_context yc) { + [sslConn, endpoint, host, port] { Log(LogCritical, "ApiListener") << "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "', cancelling attempt"; @@ -687,7 +687,7 @@ void ApiListener::NewClientHandlerInternal( strand->context(), *strand, boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)), - [strand, client](asio::yield_context yc) { + [strand, client] { boost::system::error_code ec; client->lowest_layer().cancel(ec); } From faaeb4eb2e8e4514a7476307419a3d51413593e9 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 26 Nov 2024 15:04:27 +0100 Subject: [PATCH 02/10] Timeout: use a plain callback, not an unnecessary coroutine --- lib/base/io-engine.cpp | 2 -- lib/base/io-engine.hpp | 33 +++++---------------------------- 2 files changed, 5 insertions(+), 30 deletions(-) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 26125feb310..2727236ad63 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -148,8 +148,6 @@ void AsioConditionVariable::Wait(boost::asio::yield_context yc) void Timeout::Cancel() { - m_Cancelled.store(true); - boost::system::error_code ec; m_Timer.cancel(ec); } diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 83ac9aaabad..81efc1c9610 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -172,42 +172,19 @@ class Timeout : public SharedObject template Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout) - : m_Timer(io) + : m_Timer(io, timeoutFromNow) { - Ptr keepAlive (this); - - m_Cancelled.store(false); - m_Timer.expires_from_now(std::move(timeoutFromNow)); - - IoEngine::SpawnCoroutine(executor, [this, keepAlive, onTimeout](boost::asio::yield_context yc) { - if (m_Cancelled.load()) { - return; - } - - { - boost::system::error_code ec; - - m_Timer.async_wait(yc[ec]); - - if (ec) { - return; - } + m_Timer.async_wait(boost::asio::bind_executor(executor, [onTimeout = std::move(onTimeout)](boost::system::error_code ec) { + if (!ec) { + onTimeout(); } - - if (m_Cancelled.load()) { - return; - } - - auto f (onTimeout); - f(); - }); + })); } void Cancel(); private: boost::asio::deadline_timer m_Timer; - std::atomic m_Cancelled; }; } From 8cdbea303baec4f0c635efd6e440e4230f3755f4 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 26 Nov 2024 15:30:06 +0100 Subject: [PATCH 03/10] Test Timeout --- test/CMakeLists.txt | 6 ++ test/base-io-engine.cpp | 143 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 test/base-io-engine.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index dd9724f0bf0..a255178da2c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -62,6 +62,7 @@ set(base_test_SOURCES base-convert.cpp base-dictionary.cpp base-fifo.cpp + base-io-engine.cpp base-json.cpp base-match.cpp base-netstring.cpp @@ -128,6 +129,11 @@ add_boost_test(base base_dictionary/keys_ordered base_fifo/construct base_fifo/io + base_io_engine/timeout_run + base_io_engine/timeout_cancelled + base_io_engine/timeout_scope + base_io_engine/timeout_due_cancelled + base_io_engine/timeout_due_scope base_json/encode base_json/decode base_json/invalid1 diff --git a/test/base-io-engine.cpp b/test/base-io-engine.cpp new file mode 100644 index 00000000000..2261bab91e9 --- /dev/null +++ b/test/base-io-engine.cpp @@ -0,0 +1,143 @@ +/* Icinga 2 | (c) 2024 Icinga GmbH | GPLv2+ */ + +#include "base/io-engine.hpp" +#include "base/utility.hpp" +#include +#include +#include + +using namespace icinga; + +BOOST_AUTO_TEST_SUITE(base_io_engine) + +BOOST_AUTO_TEST_CASE(timeout_run) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + + Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + }); + + io.run(); + BOOST_CHECK_EQUAL(called, 1); +} + +BOOST_AUTO_TEST_CASE(timeout_cancelled) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + + timeout->Cancel(); + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + }); + + io.run(); + BOOST_CHECK_EQUAL(called, 0); +} + +BOOST_AUTO_TEST_CASE(timeout_scope) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + + { + Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + } + + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + }); + + io.run(); + BOOST_CHECK_EQUAL(called, 0); +} + +BOOST_AUTO_TEST_CASE(timeout_due_cancelled) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + + // Give the timeout enough time to become due while blocking its strand to prevent it from actually running... + Utility::Sleep(0.4); + + BOOST_CHECK_EQUAL(called, 0); + + // ... so that this shall still work: + timeout->Cancel(); + + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(100)); + timer.async_wait(yc); + }); + + io.run(); + BOOST_CHECK_EQUAL(called, 0); +} + +BOOST_AUTO_TEST_CASE(timeout_due_scope) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + + { + Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + + // Give the timeout enough time to become due while blocking its strand to prevent it from actually running... + Utility::Sleep(0.4); + + BOOST_CHECK_EQUAL(called, 0); + } // ... so that Timeout#~Timeout() shall still work here. + + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(100)); + timer.async_wait(yc); + }); + + io.run(); + BOOST_CHECK_EQUAL(called, 0); +} + +BOOST_AUTO_TEST_SUITE_END() From d2285bcf0e4648233a93d76777fbe26fcbf20d76 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 28 Nov 2024 16:31:18 +0100 Subject: [PATCH 04/10] While using Timeout, don't unnecessarily keep the strand alive via smart pointer --- lib/icingadb/redisconnection.hpp | 4 +--- lib/remote/apilistener.cpp | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index 04c6635853d..84be22dba09 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -514,13 +514,11 @@ void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) template Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream) { - Ptr keepAlive (this); - return new Timeout( m_Strand.context(), m_Strand, boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)), - [keepAlive, stream] { + [stream] { boost::system::error_code ec; stream->lowest_layer().cancel(ec); } diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 601367e4991..913470bd449 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -687,7 +687,7 @@ void ApiListener::NewClientHandlerInternal( strand->context(), *strand, boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)), - [strand, client] { + [client] { boost::system::error_code ec; client->lowest_layer().cancel(ec); } From cb516493638ec9713b8f49a196f7571fe74baca6 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Thu, 28 Nov 2024 16:03:27 +0100 Subject: [PATCH 05/10] Timeout#Timeout(): drop unnecessary template parameters --- lib/base/io-engine.hpp | 14 +++++++++----- lib/base/tlsstream.cpp | 2 +- lib/icingadb/redisconnection.hpp | 1 - lib/methods/ifwapichecktask.cpp | 2 +- lib/remote/apilistener.cpp | 5 ++--- test/base-io-engine.cpp | 10 +++++----- 6 files changed, 18 insertions(+), 16 deletions(-) diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 81efc1c9610..a030d204495 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -3,6 +3,7 @@ #ifndef IO_ENGINE_H #define IO_ENGINE_H +#include "base/debug.hpp" #include "base/exception.hpp" #include "base/lazy-init.hpp" #include "base/logger.hpp" @@ -169,12 +170,15 @@ class Timeout : public SharedObject { public: DECLARE_PTR_TYPEDEFS(Timeout); + using Timer = boost::asio::deadline_timer; - template - Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout) - : m_Timer(io, timeoutFromNow) + template + Timeout(boost::asio::io_context::strand& strand, const Timer::duration_type& timeoutFromNow, OnTimeout onTimeout) + : m_Timer(strand.context(), timeoutFromNow) { - m_Timer.async_wait(boost::asio::bind_executor(executor, [onTimeout = std::move(onTimeout)](boost::system::error_code ec) { + VERIFY(strand.running_in_this_thread()); + + m_Timer.async_wait(boost::asio::bind_executor(strand, [onTimeout = std::move(onTimeout)](boost::system::error_code ec) { if (!ec) { onTimeout(); } @@ -184,7 +188,7 @@ class Timeout : public SharedObject void Cancel(); private: - boost::asio::deadline_timer m_Timer; + Timer m_Timer; }; } diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index c7f2884f433..d1153f2d420 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -140,7 +140,7 @@ void AsioTlsStream::GracefulDisconnect(boost::asio::io_context::strand& strand, } { - Timeout::Ptr shutdownTimeout(new Timeout(strand.context(), strand, boost::posix_time::seconds(10), + Timeout::Ptr shutdownTimeout(new Timeout(strand, boost::posix_time::seconds(10), [this] { // Forcefully terminate the connection if async_shutdown() blocked more than 10 seconds. ForceDisconnect(); diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index 84be22dba09..f73c1fbdf44 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -515,7 +515,6 @@ template Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream) { return new Timeout( - m_Strand.context(), m_Strand, boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)), [stream] { diff --git a/lib/methods/ifwapichecktask.cpp b/lib/methods/ifwapichecktask.cpp index d5148623a0c..ad19507e7cb 100644 --- a/lib/methods/ifwapichecktask.cpp +++ b/lib/methods/ifwapichecktask.cpp @@ -456,7 +456,7 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes IoEngine::SpawnCoroutine( *strand, [strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) { - Timeout::Ptr timeout = new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)), + Timeout::Ptr timeout = new Timeout(*strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)), [&conn, &checkable] { Log(LogNotice, "IfwApiCheckTask") << "Timeout while checking " << checkable->GetReflectionType()->GetName() diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 913470bd449..8a18caf7ecb 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -534,7 +534,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha auto strand (Shared::Make(io)); IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) { - Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), + Timeout::Ptr timeout (new Timeout(*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), [sslConn, remoteEndpoint] { Log(LogWarning, "ApiListener") << "Timeout while processing incoming connection from " << remoteEndpoint; @@ -585,7 +585,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) lock.unlock(); - Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), + Timeout::Ptr timeout (new Timeout(*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), [sslConn, endpoint, host, port] { Log(LogCritical, "ApiListener") << "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host @@ -684,7 +684,6 @@ void ApiListener::NewClientHandlerInternal( { Timeout::Ptr handshakeTimeout (new Timeout( - strand->context(), *strand, boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)), [client] { diff --git a/test/base-io-engine.cpp b/test/base-io-engine.cpp index 2261bab91e9..74cb67319b4 100644 --- a/test/base-io-engine.cpp +++ b/test/base-io-engine.cpp @@ -19,7 +19,7 @@ BOOST_AUTO_TEST_CASE(timeout_run) boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { boost::asio::deadline_timer timer (io); - Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); BOOST_CHECK_EQUAL(called, 0); timer.expires_from_now(boost::posix_time::millisec(200)); @@ -42,7 +42,7 @@ BOOST_AUTO_TEST_CASE(timeout_cancelled) boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { boost::asio::deadline_timer timer (io); - Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); timer.expires_from_now(boost::posix_time::millisec(200)); timer.async_wait(yc); @@ -68,7 +68,7 @@ BOOST_AUTO_TEST_CASE(timeout_scope) boost::asio::deadline_timer timer (io); { - Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); timer.expires_from_now(boost::posix_time::millisec(200)); timer.async_wait(yc); @@ -92,7 +92,7 @@ BOOST_AUTO_TEST_CASE(timeout_due_cancelled) boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { boost::asio::deadline_timer timer (io); - Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); // Give the timeout enough time to become due while blocking its strand to prevent it from actually running... Utility::Sleep(0.4); @@ -122,7 +122,7 @@ BOOST_AUTO_TEST_CASE(timeout_due_scope) boost::asio::deadline_timer timer (io); { - Timeout::Ptr timeout = new Timeout(io, strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); // Give the timeout enough time to become due while blocking its strand to prevent it from actually running... Utility::Sleep(0.4); From 959b162913e38699c4376fa839a857fae82e6c95 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 29 Nov 2024 14:16:08 +0100 Subject: [PATCH 06/10] Timeout#~Timeout(), #Cancel(): support boost::asio::io_context running on multiple threads --- lib/base/io-engine.cpp | 2 ++ lib/base/io-engine.hpp | 20 +++++++++++++++----- test/base-io-engine.cpp | 16 ++++++++++++++++ 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 2727236ad63..246a448f29e 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -148,6 +148,8 @@ void AsioConditionVariable::Wait(boost::asio::yield_context yc) void Timeout::Cancel() { + m_Cancelled->store(true); + boost::system::error_code ec; m_Timer.cancel(ec); } diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index a030d204495..919f773bc65 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -3,10 +3,12 @@ #ifndef IO_ENGINE_H #define IO_ENGINE_H +#include "base/atomic.hpp" #include "base/debug.hpp" #include "base/exception.hpp" #include "base/lazy-init.hpp" #include "base/logger.hpp" +#include "base/shared.hpp" #include "base/shared-object.hpp" #include #include @@ -174,21 +176,29 @@ class Timeout : public SharedObject template Timeout(boost::asio::io_context::strand& strand, const Timer::duration_type& timeoutFromNow, OnTimeout onTimeout) - : m_Timer(strand.context(), timeoutFromNow) + : m_Timer(strand.context(), timeoutFromNow), m_Cancelled(Shared>::Make(false)) { VERIFY(strand.running_in_this_thread()); - m_Timer.async_wait(boost::asio::bind_executor(strand, [onTimeout = std::move(onTimeout)](boost::system::error_code ec) { - if (!ec) { - onTimeout(); + m_Timer.async_wait(boost::asio::bind_executor( + strand, [cancelled = m_Cancelled, onTimeout = std::move(onTimeout)](boost::system::error_code ec) { + if (!ec && !cancelled->load()) { + onTimeout(); + } } - })); + )); + } + + ~Timeout() override + { + Cancel(); } void Cancel(); private: Timer m_Timer; + Shared>::Ptr m_Cancelled; }; } diff --git a/test/base-io-engine.cpp b/test/base-io-engine.cpp index 74cb67319b4..d75ce706d36 100644 --- a/test/base-io-engine.cpp +++ b/test/base-io-engine.cpp @@ -5,6 +5,7 @@ #include #include #include +#include using namespace icinga; @@ -30,7 +31,10 @@ BOOST_AUTO_TEST_CASE(timeout_run) timer.async_wait(yc); }); + std::thread eventLoop ([&io] { io.run(); }); io.run(); + eventLoop.join(); + BOOST_CHECK_EQUAL(called, 1); } @@ -54,7 +58,10 @@ BOOST_AUTO_TEST_CASE(timeout_cancelled) timer.async_wait(yc); }); + std::thread eventLoop ([&io] { io.run(); }); io.run(); + eventLoop.join(); + BOOST_CHECK_EQUAL(called, 0); } @@ -80,7 +87,10 @@ BOOST_AUTO_TEST_CASE(timeout_scope) timer.async_wait(yc); }); + std::thread eventLoop ([&io] { io.run(); }); io.run(); + eventLoop.join(); + BOOST_CHECK_EQUAL(called, 0); } @@ -108,7 +118,10 @@ BOOST_AUTO_TEST_CASE(timeout_due_cancelled) timer.async_wait(yc); }); + std::thread eventLoop ([&io] { io.run(); }); io.run(); + eventLoop.join(); + BOOST_CHECK_EQUAL(called, 0); } @@ -136,7 +149,10 @@ BOOST_AUTO_TEST_CASE(timeout_due_scope) timer.async_wait(yc); }); + std::thread eventLoop ([&io] { io.run(); }); io.run(); + eventLoop.join(); + BOOST_CHECK_EQUAL(called, 0); } From d77d7506f1379ffade375eb1d66afb414d9c36a1 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 2 Dec 2024 16:06:58 +0100 Subject: [PATCH 07/10] Don't call Timeout#Cancel() where Timeout#~Timeout() is called --- lib/base/tlsstream.cpp | 3 --- lib/icingadb/redisconnection.cpp | 3 --- lib/methods/ifwapichecktask.cpp | 2 -- lib/remote/apilistener.cpp | 4 ---- 4 files changed, 12 deletions(-) diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index d1153f2d420..b7280be094e 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -146,9 +146,6 @@ void AsioTlsStream::GracefulDisconnect(boost::asio::io_context::strand& strand, ForceDisconnect(); } )); - Defer cancelTimeout ([&shutdownTimeout]() { - shutdownTimeout->Cancel(); - }); // Close the TLS connection, effectively uses SSL_shutdown() to send a close_notify shutdown alert to the peer. boost::system::error_code ec; diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index c187d7f1ed0..a6b82187dd0 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -318,7 +318,6 @@ void RedisConnection::Connect(asio::yield_context& yc) auto conn (Shared::Make(m_Strand.context(), *m_TLSContext, m_Host)); auto& tlsConn (conn->next_layer()); auto connectTimeout (MakeTimeout(conn)); - Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc); tlsConn.async_handshake(tlsConn.client, yc); @@ -348,7 +347,6 @@ void RedisConnection::Connect(asio::yield_context& yc) auto conn (Shared::Make(m_Strand.context())); auto connectTimeout (MakeTimeout(conn)); - Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); Handshake(conn, yc); @@ -361,7 +359,6 @@ void RedisConnection::Connect(asio::yield_context& yc) auto conn (Shared::Make(m_Strand.context())); auto connectTimeout (MakeTimeout(conn)); - Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); Handshake(conn, yc); diff --git a/lib/methods/ifwapichecktask.cpp b/lib/methods/ifwapichecktask.cpp index ad19507e7cb..43221d15489 100644 --- a/lib/methods/ifwapichecktask.cpp +++ b/lib/methods/ifwapichecktask.cpp @@ -467,8 +467,6 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes } ); - Defer cancelTimeout ([&timeout]() { timeout->Cancel(); }); - DoIfwNetIo(yc, cr, psCommand, psHost, expectedSan, psPort, *conn, *req); cr->SetExecutionEnd(Utility::GetTime()); diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 8a18caf7ecb..a17fbbad670 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -543,7 +543,6 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha sslConn->lowest_layer().cancel(ec); } )); - Defer cancelTimeout([timeout]() { timeout->Cancel(); }); NewClientHandler(yc, strand, sslConn, String(), RoleServer); }); @@ -595,7 +594,6 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) sslConn->lowest_layer().cancel(ec); } )); - Defer cancelTimeout([&timeout]() { timeout->Cancel(); }); Connect(sslConn->lowest_layer(), host, port, yc); @@ -693,8 +691,6 @@ void ApiListener::NewClientHandlerInternal( )); sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]); - - handshakeTimeout->Cancel(); } if (ec) { From 27e0e236cbabe0d8619ca921a5a82a5cb548e02b Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 2 Dec 2024 16:18:13 +0100 Subject: [PATCH 08/10] Move Timeout instances from heap to stack --- lib/base/io-engine.hpp | 6 ++---- lib/base/tlsstream.cpp | 4 ++-- lib/icingadb/redisconnection.hpp | 6 +++--- lib/methods/ifwapichecktask.cpp | 2 +- lib/remote/apilistener.cpp | 12 ++++++------ test/base-io-engine.cpp | 14 +++++++------- 6 files changed, 21 insertions(+), 23 deletions(-) diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 919f773bc65..466927d1022 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -9,7 +9,6 @@ #include "base/lazy-init.hpp" #include "base/logger.hpp" #include "base/shared.hpp" -#include "base/shared-object.hpp" #include #include #include @@ -168,10 +167,9 @@ class AsioConditionVariable * * @ingroup base */ -class Timeout : public SharedObject +class Timeout { public: - DECLARE_PTR_TYPEDEFS(Timeout); using Timer = boost::asio::deadline_timer; template @@ -189,7 +187,7 @@ class Timeout : public SharedObject )); } - ~Timeout() override + ~Timeout() { Cancel(); } diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index b7280be094e..66514e0cf10 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -140,12 +140,12 @@ void AsioTlsStream::GracefulDisconnect(boost::asio::io_context::strand& strand, } { - Timeout::Ptr shutdownTimeout(new Timeout(strand, boost::posix_time::seconds(10), + Timeout shutdownTimeout (strand, boost::posix_time::seconds(10), [this] { // Forcefully terminate the connection if async_shutdown() blocked more than 10 seconds. ForceDisconnect(); } - )); + ); // Close the TLS connection, effectively uses SSL_shutdown() to send a close_notify shutdown alert to the peer. boost::system::error_code ec; diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index f73c1fbdf44..3f963f3d37d 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -222,7 +222,7 @@ namespace icinga void Handshake(StreamPtr& stream, boost::asio::yield_context& yc); template - Timeout::Ptr MakeTimeout(StreamPtr& stream); + Timeout MakeTimeout(StreamPtr& stream); String m_Path; String m_Host; @@ -512,9 +512,9 @@ void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) * @param stream Redis server connection */ template -Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream) +Timeout RedisConnection::MakeTimeout(StreamPtr& stream) { - return new Timeout( + return Timeout( m_Strand, boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)), [stream] { diff --git a/lib/methods/ifwapichecktask.cpp b/lib/methods/ifwapichecktask.cpp index 43221d15489..ce48deefc3a 100644 --- a/lib/methods/ifwapichecktask.cpp +++ b/lib/methods/ifwapichecktask.cpp @@ -456,7 +456,7 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes IoEngine::SpawnCoroutine( *strand, [strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) { - Timeout::Ptr timeout = new Timeout(*strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)), + Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)), [&conn, &checkable] { Log(LogNotice, "IfwApiCheckTask") << "Timeout while checking " << checkable->GetReflectionType()->GetName() diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index a17fbbad670..519469aafa4 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -534,7 +534,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha auto strand (Shared::Make(io)); IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) { - Timeout::Ptr timeout (new Timeout(*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), + Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), [sslConn, remoteEndpoint] { Log(LogWarning, "ApiListener") << "Timeout while processing incoming connection from " << remoteEndpoint; @@ -542,7 +542,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha boost::system::error_code ec; sslConn->lowest_layer().cancel(ec); } - )); + ); NewClientHandler(yc, strand, sslConn, String(), RoleServer); }); @@ -584,7 +584,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) lock.unlock(); - Timeout::Ptr timeout (new Timeout(*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), + Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), [sslConn, endpoint, host, port] { Log(LogCritical, "ApiListener") << "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host @@ -593,7 +593,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) boost::system::error_code ec; sslConn->lowest_layer().cancel(ec); } - )); + ); Connect(sslConn->lowest_layer(), host, port, yc); @@ -681,14 +681,14 @@ void ApiListener::NewClientHandlerInternal( boost::system::error_code ec; { - Timeout::Ptr handshakeTimeout (new Timeout( + Timeout handshakeTimeout ( *strand, boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)), [client] { boost::system::error_code ec; client->lowest_layer().cancel(ec); } - )); + ); sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]); } diff --git a/test/base-io-engine.cpp b/test/base-io-engine.cpp index d75ce706d36..869688b1a67 100644 --- a/test/base-io-engine.cpp +++ b/test/base-io-engine.cpp @@ -20,7 +20,7 @@ BOOST_AUTO_TEST_CASE(timeout_run) boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { boost::asio::deadline_timer timer (io); - Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); BOOST_CHECK_EQUAL(called, 0); timer.expires_from_now(boost::posix_time::millisec(200)); @@ -46,12 +46,12 @@ BOOST_AUTO_TEST_CASE(timeout_cancelled) boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { boost::asio::deadline_timer timer (io); - Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); timer.expires_from_now(boost::posix_time::millisec(200)); timer.async_wait(yc); - timeout->Cancel(); + timeout.Cancel(); BOOST_CHECK_EQUAL(called, 0); timer.expires_from_now(boost::posix_time::millisec(200)); @@ -75,7 +75,7 @@ BOOST_AUTO_TEST_CASE(timeout_scope) boost::asio::deadline_timer timer (io); { - Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); timer.expires_from_now(boost::posix_time::millisec(200)); timer.async_wait(yc); @@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(timeout_due_cancelled) boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { boost::asio::deadline_timer timer (io); - Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); // Give the timeout enough time to become due while blocking its strand to prevent it from actually running... Utility::Sleep(0.4); @@ -110,7 +110,7 @@ BOOST_AUTO_TEST_CASE(timeout_due_cancelled) BOOST_CHECK_EQUAL(called, 0); // ... so that this shall still work: - timeout->Cancel(); + timeout.Cancel(); BOOST_CHECK_EQUAL(called, 0); @@ -135,7 +135,7 @@ BOOST_AUTO_TEST_CASE(timeout_due_scope) boost::asio::deadline_timer timer (io); { - Timeout::Ptr timeout = new Timeout(strand, boost::posix_time::millisec(300), [&called] { ++called; }); + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); // Give the timeout enough time to become due while blocking its strand to prevent it from actually running... Utility::Sleep(0.4); From 3ca7ff7bf4149b1e059d299ea33c6b97fd2c6f30 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 2 Dec 2024 17:00:02 +0100 Subject: [PATCH 09/10] Timeout: explicitly delete #Timeout(const Timeout&), #Timeout(Timeout&&), #operator=(const Timeout&), #operator=(Timeout&&) --- lib/base/io-engine.hpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 466927d1022..e7cd630965a 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -187,6 +187,11 @@ class Timeout )); } + Timeout(const Timeout&) = delete; + Timeout(Timeout&&) = delete; + Timeout& operator=(const Timeout&) = delete; + Timeout& operator=(Timeout&&) = delete; + ~Timeout() { Cancel(); From 8f7289122888634574f090308806db4ecf58e4e8 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 29 Nov 2024 12:18:13 +0100 Subject: [PATCH 10/10] Document Timeout --- lib/base/io-engine.cpp | 5 +++++ lib/base/io-engine.hpp | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 246a448f29e..3190ed03d82 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -146,6 +146,11 @@ void AsioConditionVariable::Wait(boost::asio::yield_context yc) m_Timer.async_wait(yc[ec]); } +/** + * Cancels any pending timeout callback. + * + * Must be called in the strand in which the callback was scheduled! + */ void Timeout::Cancel() { m_Cancelled->store(true); diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index e7cd630965a..0350d45b83d 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -165,6 +165,22 @@ class AsioConditionVariable /** * I/O timeout emulator * + * This class provides a workaround for Boost.ASIO's lack of built-in timeout support. + * While Boost.ASIO handles asynchronous operations, it does not natively support timeouts for these operations. + * This class uses a boost::asio::deadline_timer to emulate a timeout by scheduling a callback to be triggered + * after a specified duration, effectively adding timeout behavior where none exists. + * The callback is executed within the provided strand, ensuring thread-safety. + * + * The constructor returns immediately after scheduling the timeout callback. + * The callback itself is invoked asynchronously when the timeout occurs. + * This allows the caller to continue execution while the timeout is running in the background. + * + * The class provides a Cancel() method to unschedule any pending callback. If the callback has already been run, + * calling Cancel() has no effect. This method can be used to abort the timeout early if the monitored operation + * completes before the callback has been run. The Timeout destructor also automatically cancels any pending callback. + * A callback is considered pending even if the timeout has already expired, + * but the callback has not been executed yet due to a busy strand. + * * @ingroup base */ class Timeout @@ -172,6 +188,14 @@ class Timeout public: using Timer = boost::asio::deadline_timer; + /** + * Schedules onTimeout to be triggered after timeoutFromNow on strand. + * + * @param strand The strand in which the callback will be executed. + * The caller must also run in this strand, as well as Cancel() and the destructor! + * @param timeoutFromNow The duration after which the timeout callback will be triggered. + * @param onTimeout The callback to invoke when the timeout occurs. + */ template Timeout(boost::asio::io_context::strand& strand, const Timer::duration_type& timeoutFromNow, OnTimeout onTimeout) : m_Timer(strand.context(), timeoutFromNow), m_Cancelled(Shared>::Make(false)) @@ -192,6 +216,11 @@ class Timeout Timeout& operator=(const Timeout&) = delete; Timeout& operator=(Timeout&&) = delete; + /** + * Cancels any pending timeout callback. + * + * Must be called in the strand in which the callback was scheduled! + */ ~Timeout() { Cancel(); @@ -201,6 +230,14 @@ class Timeout private: Timer m_Timer; + + /** + * Indicates whether the Timeout has been cancelled. + * + * This must be Shared<> between the lambda in the constructor and Cancel() for the case + * the destructor calls Cancel() while the lambda is already queued in the strand. + * The whole Timeout instance can't be kept alive by the lambda because this would delay the destructor. + */ Shared>::Ptr m_Cancelled; };