-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.5 Inflight Limit Mbox
The purpose of inflight-limit mbox is to limit the number of messages that were sent but weren't processed yet (so-called "in-flight" messages). The inflight-limit mbox can be seen as another form of overload control.
There are already message limits for agents that allow to protect an agent from messages overflow, but message limits work just for a particular agent. They are not applicable if, for example, several agents are hidden behind some tricky mbox (like round-robin mbox or collecting mbox). The inflight-limit mbox can be used in these cases.
The inflight-limit mbox works as a proxy for an actual destination mbox. It means that inflight-limit mbox doesn't hold subscribers and/or delivery filters, all this functionality is delegated to the underlying mbox.
What inflight-limit mbox does is count messages that were posted and those delivery wasn't completed yet.
It means that when a message is passed to inflight-limit mbox's do_deliver_message
method the mbox checks the number of unprocessed messages. If that number is below the specified limit then the message is transferred to underlying mbox do_deliver_message
(and will be delivered the usual way). But if the number of "in-flight" messages exceeds the limit, then the message is just discarded.
An instance of inflight-limit mbox holds an atomic counter for "in-flight" messages. Value of that counter is checked in every do_deliver_message
call. If the counter doesn't exceed the limit then:
- this counter is incremented;
- a special envelope is created and the original message is passed to that envelope as the payload;
- this envelope is passed to the underlying mbox instead of the original message;
- when this envelope is destroyed (as the result of a message delivery procedure) the counter is decremented.
The corresponding header file has to be included:
#include <so_5_extra/mboxes/inflight_limit.hpp>
Please note that so_5/all.hpp
header file should also be included explicitly if you need to work with SObjectizer's environment, agents, standard mboxes, messages and so on:
#include <so_5_extra/mboxes/inflight_limit.hpp>
#include <so_5/all.hpp>
Then an instance of inflight-limit mbox can be created:
so_5::environment_t & env = ...;
// An mbox with limit.
auto limited_mbox = so_5::extra::mboxes::inflight_limit::make_mbox<my_msg>(
// Actual destination mbox.
env.create_mbox(),
// The limit.
4u);
// Now limited_mbox has to be used for sending my_msg instances:
so_5::send<my_msg>(limited_mbox, ...);
Once an instance on inflight-limited mbox is created it has to be used for message sending.
An instance of inflight-limited mbox can be used for subscriptions and/or setting delivery filters. For example:
class demo final : public so_5::agent_t
{
const so_5::mbox_t limited_mbox_;
...
public:
demo(context_t ctx)
: so_5::agent_t{std::move(ctx)}
, m_limited_mbox{so_5::extra::mboxes::inflight_limit::make_mbox<my_msg>(
// The direct mbox as the destination.
so_direct_mbox(),
// The limit.
2u)
}
{}
void so_define_agent() override
{
// Delivery filter.
so_set_delivery_filter(m_limited_mbox, ...);
// Subscription.
so_subscribe(m_limited_mbox).event(...);
}
...
};
But because the inflight-limit mbox is just a proxy then the original underlying mbox can be used for subscriptions and delivery filters. For example:
// Suppose it's an agent from a 3rd party library.
class external_agent final : public so_5::agent_t
{
...
void so_define_agent() override
{
// Subscription to the direct mbox.
so_subscribe_self().event([this](const my_msg & msg) {...})
}
}
...
// Somewhere in our own code.
auto limited_mbox = env.introduce_coop([](so_5::coop_t & coop) {
auto agent = coop.make_agent<external_agent>(...);
// Create inflight-mbox for agent's direct mbox.
return so_5::extra::mboxes::inflight_mbox::make_mbox<my_msg>(
agent->so_direct_mbox(), 2u);
});
// Now limited_mbox can be used for message sending.
// The external_agent will receive this message because
// limited_mbox is just a proxy for external_agent's direct mbox.
so_5::send<my_msg>(limited_mbox, ...);
Every instance of inflight-limit mbox is created for just one message type. For example:
// Actual underlying mbox.
auto dest_mbox = ...;
// The first instance of inflight-limit mbox.
auto first_limit = so_5::extra::mboxes::inflight_limit::make_mbox<first_msg>(dest_mbox, 5u);
// The second instance of inflight-limit mbox.
auto second_limit = so_5::extra::mboxes::inflight_limit::make_mbox<second_msg>(dest_mbox, 30u);
// The third instance of inflight-limit mbox.
auto third_limit = so_5::extra::mboxes::inflight_limit::make_mbox<third_msg>(dest_mbox, 2u);
If it's necessary to have just one mbox with several limits for different message types then composite-mbox can be used:
// Actual underlying mbox.
auto dest_mbox = ...;
// The result mbox.
auto result_mbox = so_5::extra::mboxes::composite::multi_consumer_builder(
so_5::extra::mboxes::composite::throw_if_not_found())
// Definition of limits for message types.
.add<first_msg>(
so_5::extra::mboxes::inflight_limit::make_mbox<first_msg>(dest_mbox, 5u))
.add<second_msg>(
so_5::extra::mboxes::inflight_limit::make_mbox<second_msg>(dest_mbox, 30u))
.add<third_msg>(
so_5::extra::mboxes::inflight_limit::make_mbox<third_msg>(dest_mbox, 2u))
// Creation of the result mbox.
.build(dest_mbox.environment());
Because inflight-mbox is just a proxy it inherits its type (multi-producer/multi-consumer or multi-producer/single-consumer) from the underlying mbox.
It means that if an inflight-mbox is created for MPMC mbox then it will be MPMC mbox:
so_5::environment_t & env = ...;
auto limited_mbox = so_5::extra::mboxes::inflight_limit::make_mbox<my_msg>(
// MPMC mbox as the destination.
env.create_mbox(),
4u);
assert(so_5::mbox_type_t::multi_producer_multi_consumer == limited_mbox->type());
If the underlying mbox is MPSC mbox then inflight-limit mbox can handle mutable messages. In that case so_5::mutable_msg
has to be used for message type, for example:
auto mbox = so_5::extra::mboxes::inflight_limit::make_mbox<
so_5::mutable_msg<my_msg> >(some_agent->so_direct_mbox(), 5u);
...
so_5::send< so_5::mutable_msg<my_msg> >(*some_agent, ...);
The inflight-limit mbox increments the number of "in-flight" messages when do_deliver_message
method is called (in other words inside so_5::send()
function) and decrements the number when processing of the message finishes. So a message is an "in-flight" message when it is between the call of so_5::send()
and the completion of message processing.
But what does "processing finished" mean?
The processing of a message finishes when: the message is dropped (by any reason) or the event-handler called for processing of that message returns. It means that if an event-handler is called and is still working then the message is in "in-flight" state until the event-handler completes.
Please note that a message can be dropped (discarded) by various reasons, like:
- delivery filters prohibits delivery of that message;
- a message limit drops or transform the message;
- the message was revoked (see
so_5::extra::revocable_msg
submodule); - and so on.
If the underlying mbox is retained-msg mbox (see so_5::extra::mboxes::retained_mbox
submodule) then it's possible that the message sent will be in "in-flight" status for a long time. It's because retained-mbox stores the last instance of message sent.
so5extra contains tools like an enveloped message with time-limited delivery and revocable messages. One might think that those tools reduce the lifetime of the message sent. It's not true, they prohibit the processing of the message in some circumstances, but do not reduce the time needed to deliver the message to the receiver. It can be surprising when such tools are used with an inflight-limit mbox.
Let's see a simple example:
class my_agent final : public so_5::agent_t
{
struct demo_signal final : public so_5::signal_t {};
// An instance of inflight-limit mbox.
const so_5::mbox_t limited_mbox_;
public:
my_agent(context_t ctx)
: so_5::agent_t{std::move(ctx)}
, limited_mbox_{
so_5::extra::mboxes::inflight_limit::make_mbox<demo_signal>(
// Delivery to the direct mbox.
so_direct_mbox(),
// Just one message can be "in-flight".
1u)
}
{}
void so_define_agent() override {
so_subscribe(limited_mbox_).event([](mhood_t<demo_signal>) {});
}
void so_evt_start() override {
// Send the first signal.
auto delivery_id = so_5::extra::revocable_msg::send<demo_signal>(limited_mbox_); // (0)
// Revoke the signal immediately.
delivery_id.revoke(); // (1)
// Try to send the second signal.
so_5::send<demo_signal>(limited_mbox_); // (2)
}
};
Someone may think that the first signal sent will be destroyed and removed from the agent's message queue at the point (1). But it works in a different way.
The signal sent at point (0) will be stored in the agent's message queue in the form of a special envelope.
At the point (1) that special envelope will be marked as "revoked". But it will stay in the queue because there is no way to remove an unprocessed message from the agent's queue in SObjectizer-5. So the message sent at point (0) will stay in the queue and this message will be considered as "in-flight".
Because of that an attempt to send another signal at point (2) will be ignored, e.g. a new signal will be discarded because of the excession of the limit.