-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.5 Async Op
Sometimes it is necessary to perform one-time interaction between agents in SObjectizer's based application. For example, agent A sends a one-time request to agent B and then want to receive a reply from B. In a simple case agent A expect to receive just one reply message. This can be done like that:
class A : public so_5::agent_t {
so_5::mbox_t reply_mbox_;
...
void on_reply_handler(mhood_t<reply_msg> cmd) {
// We do not need this subscription anymore. It should be destroyed.
so_drop_subscription<reply_msg>(reply_mbox_);
// Now the reply can be handled.
...
}
...
void initiate_request(const so_5::mbox_t & request_reply_mbox) {
// Store mbox. It will be needed later to remove the subscription for reply.
reply_mbox_ = request_reply_mbox;
// Create a subscription for reply.
so_subscribe(reply_mbox_).event(&A::on_reply_handler);
// And now the request can be sent.
so_5::send<request>(request_reply_mbox, ...);
}
};
In this very simple example we can see that an user have to do some work to perform one-time interaction. But the amount of work will increase if we take states of an agent into the account:
class A : public so_5::agent_t {
so_5::mbox_t reply_mbox_;
...
void on_reply_handler_for_state1(mhood_t<reply_msg> cmd) {
// We do not need subscriptions for the reply anymore. They should be destroyed.
so_drop_subscription_for_all_states<reply_msg>(reply_mbox_);
// Now the reply can be handled.
...
}
void on_reply_handler_for_state2(mhood_t<reply_msg> cmd) {
// We do not need subscriptions for the reply anymore. They should be destroyed.
so_drop_subscription_for_all_states<reply_msg>(reply_mbox_);
// Now the reply can be handled.
...
}
void on_reply_handler_for_state3(mhood_t<reply_msg> cmd) {
// We do not need subscriptions for the reply anymore. They should be destroyed.
so_drop_subscription_for_all_states<reply_msg>(reply_mbox_);
// Now the reply can be handled.
...
}
...
void initiate_request(const so_5::mbox_t & request_reply_mbox) {
// Store mbox. It will be needed later to remove the subscription for reply.
reply_mbox_ = request_reply_mbox;
// Create subscriptions for reply.
state1_.event(reply_mbox_, &A::on_reply_handler_for_state1);
state2_.event(reply_mbox_, &A::on_reply_handler_for_state2);
state3_.event(reply_mbox_, &A::on_reply_handler_for_state3);
// And now the request can be sent.
so_5::send<request>(request_reply_mbox, ...);
}
};
The situation can be yet more worse if we expect several message types as a reply. For example:
class A : public so_5::agent_t {
so_5::mbox_t reply_mbox_;
...
void on_successful_reply_handler_for_state1(mhood_t<successful_reply_msg> cmd) {
// We do not need subscriptions for the reply anymore. They should be destroyed.
so_drop_subscription_for_all_states<successful_reply_msg>(reply_mbox_);
so_drop_subscription_for_all_states<failed_reply_msg>(reply_mbox_);
// Now the reply can be handled.
...
}
void on_failed_reply_handler_for_state1(mhood_t<failed_reply_msg> cmd) {
// We do not need subscriptions for the reply anymore. They should be destroyed.
so_drop_subscription_for_all_states<successful_reply_msg>(reply_mbox_);
so_drop_subscription_for_all_states<failed_reply_msg>(reply_mbox_);
// Now the reply can be handled.
...
}
... // The same picture for handling successful_* and failed_* reply messages in state2 and state3.
...
void initiate_request(const so_5::mbox_t & request_reply_mbox) {
// Store mbox. It will be needed later to remove the subscription for reply.
reply_mbox_ = request_reply_mbox;
// Create subscriptions for reply.
state1_.event(reply_mbox_, &A::on_successful_reply_handler_for_state1);
state1_.event(reply_mbox_, &A::on_failed_reply_handler_for_state1);
state2_.event(reply_mbox_, &A::on_successful_reply_handler_for_state2);
state2_.event(reply_mbox_, &A::on_failed_reply_handler_for_state2);
state3_.event(reply_mbox_, &A::on_successful_reply_handler_for_state3);
state3_.event(reply_mbox_, &A::on_failed_reply_handler_for_state3);
// And now the request can be sent.
so_5::send<request>(request_reply_mbox, ...);
}
};
The situation will get yet worse if we want to have several parallel one-time operations at the same time. They will use different reply mboxes and we just can't store single reply mbox as member of our agent class. One of the possible solutions for that case can be like that:
class A : public so_5::agent_t {
...
void on_successful_reply_handler_for_state1(
const so_5::mbox_t & reply_mbox,
mhood_t<successful_reply_msg> cmd) {
// We do not need subscriptions for the reply anymore. They should be destroyed.
so_drop_subscription_for_all_states<successful_reply_msg>(reply_mbox);
so_drop_subscription_for_all_states<failed_reply_msg>(reply_mbox);
// Now the reply can be handled.
...
}
void on_failed_reply_handler_for_state1(
const so_5::mbox_t & reply_mbox,
mhood_t<failed_reply_msg> cmd) {
// We do not need subscriptions for the reply anymore. They should be destroyed.
so_drop_subscription_for_all_states<successful_reply_msg>(reply_mbox);
so_drop_subscription_for_all_states<failed_reply_msg>(reply_mbox);
// Now the reply can be handled.
...
}
... // The same picture for handling successful_* and failed_* reply messages in state2 and state3.
...
void initiate_request(const so_5::mbox_t & request_reply_mbox) {
// Create subscriptions for reply.
state1_.event(request_reply_mbox,
[this, request_reply_mbox](mhood_t<successful_reply_msg> cmd) {
on_successful_reply_handler_for_state1(request_reply_msg, cmd);
});
state1_.event(request_reply_mbox,
[this, request_reply_mbox](mhood_t<failed_reply_msg> cmd) {
on_failed_reply_handler_for_state1(request_reply_msg, cmd);
});
state2_.event(request_reply_mbox,
[this, request_reply_mbox](mhood_t<successful_reply_msg> cmd) {
on_successful_reply_handler_for_state2(request_reply_msg, cmd);
});
... // The same for other states and handlers...
// And now the request can be sent.
so_5::send<request>(request_reply_mbox, ...);
}
};
And the situation can be even worse if the reply can be received from different mboxes. Or if there is a timeout for one-time interaction. Or if there are some external factors like the necessity of cancellation of the interaction.
The examples above show that in case of one-time interaction between agents a developer has to write a lot of boilerplate code. It means that one-time interaction can be implemented by the tools provided by the SObjectizer itself, but it requires some work and this work can be boring and error-prone sometimes.
Since v.1.0.4 so_5_extra provides a solution for the problem described above. Such one-time interactions are now called async operations and there are helpers for working with async operations.
These helpers are divided into two groups. These groups have a similar API but have slightly different implementations. Because of that, they are placed in different namespaces:
-
so_5::extra::async_op::time_unlimited
contains helpers for working with async operations which have no time limit for execution time. -
so_5::extra::async_op::time_limited
contains helpers for async operations with limited execution time.
Just for illustration. One of the examples above can be rewritten that way by using time unlimited async operations:
#include <so_5_extra/async_op/time_unlimited.hpp>
...
class A : public so_5::agent_t {
...
void on_successful_reply_handler_for_state1(mhood_t<successful_reply_msg> cmd) {
// There is no need to remove subscriptions.
// The reply can be handled just now.
...
}
void on_failed_reply_handler_for_state1(mhood_t<failed_reply_msg> cmd) {
// The reply can be handled just now.
...
}
... // The same picture for handling successful_* and failed_* reply messages in state2 and state3.
...
void initiate_request(const so_5::mbox_t & request_reply_mbox) {
// Create an instance of async operation.
so_5::extra::async_op::time_unlimited::make(*this)
// Define event handlers for replies...
.completed_on(request_reply_mbox, state1_, &A::on_successful_reply_handler_for_state1)
.completed_on(request_reply_mbox, state1_, &A::on_failed_reply_handler_for_state1)
.completed_on(request_reply_mbox, state2_, &A::on_successful_reply_handler_for_state2);
.completed_on(request_reply_mbox, state2_, &A::on_failed_reply_handler_for_state2)
... // The same for other states and handlers...
// And now the operation can be activated.
.activate([&] {
// The request can be sent now.
so_5::send<request>(request_reply_mbox, ...);
});
}
};
An API for working with time unlimited async operations is represented by the following tools from so_5::extra::async_op::time_unlimited
namespace:
- a main class
definition_point_t
which should be used for definition and activation of an async operation. The most important methods ofdefinition_point_t
arecompleted_on
for setting up a completion handler for the async operation andactivate
for activation of the async operation; - a helper class
cancellation_point_t
which can be used for cancellation of the activated async operation; - the main factory function
make
which should be used for the creation of an instance ofdefinition_point_t
.
All of these are defined in so_5_extra/async_op/time_unlimited.hpp
header file.
The typical scenario for time unlimited async operations is:
- A user calls
make
factory and receives an instance of the emptydefinition_point_t
class. - The user then calls
definition_point_t::completed_on
for every event handler he/she wants to have as a completion handler for an async operation. - User then calls
definition_point_t::activate
for activation of an async operation.
For example:
class demo : public so_5::agent_t {
...
void on_some_msg(mhood_t<some_msg> cmd) {...}
void on_another_msg(mhood_t<another_msg> cmd) {...}
...
void initiate_async_op() {
so_5::extra::async_op::time_unlimited::make(*this)
.completed_on(some_mbox, some_state, &demo::on_some_msg)
.completed_on(another_mbox, another_state, &demo::on_another_msg)
.activate([&] {
... // Do other async_op-related actions.
});
...
}
};
In more complex cases there can be additional steps. Thus activate
method returns an instance of cancellation_point_t
which can be stored and used for cancellation of the async operation. For example:
namespace asyncop = so_5::extra::async_op::time_unlimited;
class demo : public so_5::agent_t {
// The cancellation point for the async operation.
asyncop::cancellation_point_t<> cp_;
...
void on_some_msg(mhood_t<some_msg> cmd) {...}
void on_another_msg(mhood_t<another_msg> cmd) {...}
...
void initiate_async_op() {
// Define and activate the async_op and store the cancellation_pointer
// returned by activate() method.
cp_ = asyncop::time_unlimited::make(*this)
.completed_on(some_mbox, some_state, &demo::on_some_msg)
.completed_on(another_mbox, another_state, &demo::on_another_msg)
.activate([&] {
... // Do other async_op-related actions.
});
...
}
...
void on_reset(mhood_t<reset_signal>) {
// Async operation should be cancelled now (if it is not completed yet).
cp_.cancel();
... // Some other actions.
}
};
An API for working with time-limited async operations is represented by the following tools from so_5::extra::async_op::time_limited
namespace:
- the main class
definition_point_t
which should be used for definition and activation of an async operation. The most important methods ofdefinition_point_t
arecompleted_on
for setting up a completion handler for the async operation,timeout_handler
anddefault_timeout_handler
for setting up timeout handlers, andactivate
for activation of the async operation; - a helper class
cancellation_point_t
which can be used for cancellation of the activated async operation; - the main factory function
make
which should be used for the creation of an instance ofdefinition_point_t
.
All of these are defined in so_5_extra/async_op/time_limited.hpp
header file.
The typical scenario for time limited async operations is:
- A user calls
make
factory and receives an instance of emptydefinition_point_t
class. - User calls
definition_point_t::completed_on
for every event handler he/she want to have as a completion handler for an async operation. - User calls
definition_point_t::timeout_handler
anddefinition_point_t::default_timeout_handler
for timeout handlers he/she want to have. - User then calls
definition_point_t::activate
for activation of an async operation.
For example:
class demo : public so_5::agent_t {
struct timeout final : public so_5::signal_t {};
...
void on_some_msg(mhood_t<some_msg> cmd) {...}
void on_another_msg(mhood_t<another_msg> cmd) {...}
...
void on_timeout(mhood_t<timeout>) {...}
...
void initiate_async_op() {
so_5::extra::async_op::time_limited::make<timeout>(*this)
.completed_on(some_mbox, some_state, &demo::on_some_msg)
.completed_on(another_mbox, another_state, &demo::on_another_msg)
.timeout_handler(some_state, &demo::on_timeout)
.activate(std::chrono::milliseconds(350));
... // Do other async_op-related actions.
...
}
};
In more complex cases there can be additional steps. Thus activate
method returns an instance of cancellation_point_t
which can be stored and used for cancellation of the async operation. For example:
namespace asyncop = so_5::extra::async_op::time_limited;
class demo : public so_5::agent_t {
// The cancellation point for the async operation.
asyncop::cancellation_point_t<> cp_;
...
void on_some_msg(mhood_t<some_msg> cmd) {...}
void on_another_msg(mhood_t<another_msg> cmd) {...}
...
void initiate_async_op() {
// Define and activate the async_op and store the cancellation_pointer
// returned by activate() method.
cp_ = asyncop::time_limited::make<timeout>(*this)
.completed_on(some_mbox, some_state, &demo::on_some_msg)
.completed_on(another_mbox, another_state, &demo::on_another_msg)
.activate(std::chrono::milliseconds(350));
... // Do other async_op-related actions.
...
}
...
void on_reset(mhood_t<reset_signal>) {
// Async operation should be cancelled now (if it is not completed yet).
cp_.cancel();
... // Some other actions.
}
};
If an agent has several states it can be desired to handle timeout in every state of the agent. A user can call timeout_handler
for every state, for example:
so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
.timeout_handler(state1, some_timeout_handler)
.timeout_handler(state2, some_timeout_handler)
.timeout_handler(state3, some_timeout_handler)
... // and so on...
Or default_timeout_handler
can be used:
so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
.default_timeout_handler(some_timeout_handler)
... // other tuning stuff...
The calls to timeout_handler
and default_timeout_handler
can be combined. For example in the following case first_handler
will be called if the agent receives timeout_msg
in the state st_first
, second_handler
will be called if the agent receives timeout_msg
in the state st_second
, and third_handler
will be called if the agent is in any of other states:
so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
.timeout_handler(st_first, first_handler)
.timeout_handler(st_second, second_handler)
.default_timeout_handler(third_handler)
.activate(std::chrono::milliseconds(350));
The examples above use a signal as a timeout message. But ordinary messages with some data inside can be used with time limited operations too. Arguments for the timeout message's constructors are passed as additional arguments for activate
method:
class request_manager : public so_5::agent_t {
struct request_timed_out final : public so_5::message_t {
user_id user_;
request_id req_id_;
request_timed_out(user_id user, request_id id)
: user_{std::move(user)), req_id_{std::move(id)}
{}
};
...
void on_timeout(mhood_t<request_timed_out> cmd) {
...
}
...
void schedule_user_request(
const user_id & user,
const request_id & id,
request_data req) {
// Create and tune async_op for that request.
so_5::extra::async_op::time_limited::make<request_timed_out>(*this)
.completed_on(...)
.default_timeout_handler(&request_manager::on_timeout)
.activate(
// Max operation duration.
std::chrono::seconds(5),
// Arguments for constructing instance of request_timed_out.
user, id);
...
}
};
The time unlimited async operation works that way:
When a user calls make
an empty definition point object is created. There is a special operation data object inside the definition point (the definition point holds a reference to that data objects). This data object holds an empty list of completion handlers.
When a user calls completed_on
method of definition point the list of completion handlers is filled up.
When a user calls activate
method of definition point several actions are performed:
- all completion handlers from the completion handlers list are subscribed.
- the definition point becomes empty. It means that the definition point loses its reference to the operation data object.
When any of completion messages/signals arrived the status of the operation is checked. If the operation is not canceled yet then the operation will be marked as completed. A corresponding completion handler will be called.
A user can cancel the operation. If the operation is not completed yet then the operation will be marked as canceled.
When the status of the started operation is being changed (e.g. when the operation is marked as completed or canceled) all completion handlers are unsubscribed.
Note. When any of handlers is called the status is already changed. It means that when a completion handler is called the current status of the async operation is completed
. For example:
namespace asyncop = so_5::extra::async_op
class completion_demo : public so_5::agent_t {
asyncop::cancellation_point_t<> cp_;
struct boom final : public so_5::signal_t {};
public:
completion_demo(context_t ctx) : so_5::agent_t{std::move(ctx)} {}
virtual void so_evt_start() override {
cp_ = asyncop::make(*this)
.completed_on(so_direct_mbox(), so_default_state(),
[this](mhood_t<boom>) {
assert(asyncop::status_t::completed == cp_.status());
so_deregister_agent_coop_normally();
})
.activate([this] { so_5::send<boom>(*this); });
}
};
From the technical point of view completed_on
registers a special surrogate event handler. It means that in the following case:
so_5::extra::async_op::time_unlimited::make(*this)
.completed_on(mbox, state, demo::completion_handler)
...
at the moment of activation the lambda-function like this will be subscribed instead of subscription of demo::completion_handler
method:
so_subscribe(mbox).in(state).event(
[this, async_op__ = get_internal_async_op_data()](auto cmd) {
async_op__->completed(); // All surrogate completion handlers will be unsubscribed.
this->completion_handler(cmd);
});
The time limited async operation works that way:
When a user calls make
an empty definition point object is created. There is a special operation data object inside the definition point (the definition point holds a reference to that data objects). This data object holds two empty lists. The first one is the list of completion handlers. The second -- is the list of timeout handlers.
When a user calls completed_on
method of definition point the list of completion handlers is filled up.
When a user calls timeout_handler
method of definition point the list of timeout handlers is filled up.
When a user calls default_timeout_handler
method the handler is stored inside the operation data object. If there already was the default timeout handler the old one will be replaced by the new one.
When a user calls activate
method of definition point several actions are performed:
- if there is no the default timeout handler then a special default timeout handler is created.
- all completion handlers from the completion handlers list are subscribed.
- all timeout handlers from the list of timeout handlers are subscribed. The default timeout handler is also subscribed.
- the definition point becomes empty. It means that the definition point loses its reference to the operation data object.
- a delayed timeout message or signal is sent.
When any of completion messages/signals arrived the status of the operation is checked. If the operation is not timed out nor canceled yet then the operation will be marked as completed. A corresponding completion handler will be called.
When a timeout message or signal arrives but the async operation is not completed yet then the operation will be marked as timed out. If there is a timeout handler for the current agent state or there is an user-defined default timeout handler then the appropriate timeout handler will be called.
A user can cancel the operation. If the operation is not completed nor timed out yet then the operation will be marked as canceled.
When the status of the started operation is being changed (e.g. when the operation is marked as completed, timed out or canceled) the following actions are performed:
- all completion handlers are unsubscribed;
- all timeout handlers (including the default one) are unsubscribed;
- a delayed timeout message or signal is canceled.
Note. When any of handlers is called the status is already changed. It means that when a completion handler is called the current status of the async operation is completed
. Similarly, when a timeout handler is called the current status of the async operation is timedout
.
From the technical point of view completed_on
registers a special surrogate event handler. It means that in the following case:
so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
.completed_on(mbox, state, demo::completion_handler)
...
at the moment of activation the lambda-function like this will be subscribed instead of subscription of demo::completion_handler
method:
so_subscribe(mbox).in(state).event(
[this, async_op__ = get_internal_async_op_data()](auto cmd) {
async_op__->completed();
this->completion_handler(cmd);
});
The same trick is applied when a timeout handler is added by calling timeout_handler
or default_timeout_handler
. So the code:
so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
.timeout_handler(state, demo::timeout_handler);
will lead to something like that at the moment of activation:
so_subscribe(get_special_async_op_mbox()).in(state).event(
[this, async_op__ = get_internal_async_op_data()](auto cmd) {
async_op__->timeout();
this->timeout_handler(cmd);
});
Methods completed_on
, timeout_handler
and default_timeout_handler
, reserve_completion_handlers_capacity
, reserve_timeout_handlers_capacity
of definition_point_t
classes provide strong exception safety guarantee. It means that either modification of definition point is performed successfully or there won't be modification at all in the case of an exception.
Methods activate
of definition_point_t
provides less strict guarantee. If an exception is thrown during the activation of operation then all changes (like subscriptions for completion handlers) will be rolled back. In this sense, there will be a strong guarantee. But the content of the definition point will be lost (it means that definition_point_t
will become empty). In this sense, there will be only a basic guarantee.
Methods status
, is_cancellable
, cancel
and cleanup
of cancellation_point_t
provide no exception guarantee. They are marked as noexcept
in C++ code.
There are two versions of definition_point_t::activate
for time unlimited async operations. The first form:
so_5::extra::async_op::time_unlimited::make(*this)
.completed_on(...)
.completed_on(...)
... // other completion handlers.
.activate(); // no arguments for activate.
do_some_other_activation_actions(); // (1)
And the second one:
so_5::extra::async_op::time_unlimited::make(*this)
.completed_on(...)
.completed_on(...)
... // other completion handlers.
.activate( // lambda-function as an argument
[&] {
do_some_other_activation_actions(); // (2)
});
The difference between these two forms is related to exception safety. Let assume that do_some_other_activation_actions
throws. If the first form of activation is used then the async operation will still be active after an exception from do_some_other_activation_actions
. It means that all subscriptions created inside activate
will be there. If a user doesn't want to have these subscriptions after an exception in point (1) then he/she should cancel the async operation by hand. For example:
// Cancellation point is required.
auto cp = so_5::extra::async_op::time_unlimited::make(*this)
.completed_on(...)
.completed_on(...)
... // other completion handlers.
.activate(); // no arguments for activate.
try {
do_some_other_activation_actions();
}
catch(...) {
cp.cancel();
throw;
}
If the second form of activate
is used then all activation actions (like subscriptions of completion handlers) will be automatically rolled back if do_some_other_activation_actions()
throws at point (2).
Note that definition_point_t
for time limited has just one form of activate
method. It is similar to the first form of time_unlimited::definition_point_t::activate
.
All async operation data is stored inside a special dynamically allocated operation data object. This data object is created when a factory function make
is called. A definition_point_t
instance holds a reference to that data object.
During the activation procedure, a definition point loses its reference to the operation data object. This reference is transferred to cancellation_point_t
instance. Also, the reference is present in surrogate handlers which are registered for completion and timeout handlers.
If a user does not store cancellation_point_t
then references to the operation data object will be stored only inside surrogate handlers. When the operation completes and all these surrogate handlers will be destroyed then there won't be any live reference to the operation data object and this object will be deallocated automatically.
It a user stores cancellation_point_t
then the operation data object will live after the completion of the operation until this cancellation point will be destroyed. Or until the user calls cancellation_point_t::cleanup
method.
Note that call to cancellation_point_t::cleanup
doesn't affect the operation if it is not completed (time out or canceled) yet. The call to cleanup
just removes the reference to the operation data object from cancellation_point_t
instance. But the operation itself will live until it will be completed or timed out.
A special mbox is created for time-limited async operation. This mbox is used for timeout message/signal which is sent as delayed message in definition_point_t::activate
method.
The trick is that this mbox is limitless one. It means that message limits are not taken into account when a message is being sent to that mbox. It can lead to the case when a message limit specified by a user will be ignored.
For example, in a normal situation this example will lead to abortion of an application:
class simple_limit_demo final : public so_5::agent_t {
struct timeout final : public so_5::signal_t {};
void on_timeout(mhood_t<timeout>) {}
public:
simple_limit_demo(context_t ctx)
: so_5::agent_t{ctx + limit_then_abort<timeout>(1)} {
so_subscribe_selt().event(&simple_limit_demo::on_timeout);
}
virtual void so_evt_start() override {
// Send the first instance of timeout signal.
// This instance will wait in event queue until return from so_evt_start().
so_5::send<timeout>(*this);
// Delayed send of second instance of timeout signal.
// The actual delivery attempt will be after 150ms.
so_5::send_delayed<timeout>(*this, std::chrono::milliseconds(150));
// Block the agent for 500ms.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
};
This example will be aborted after 150ms when an attempt to deliver the second instance of timeout
signal occurs.
But the message limit will be ignored in the case of time limited operation. For example:
class simple_limit_demo final : public so_5::agent_t {
struct timeout final : public so_5::signal_t {};
void on_timeout(mhood_t<timeout>) {
std::cout << "ordinary timeout handler" << std::endl;
}
public:
simple_limit_demo(context_t ctx)
: so_5::agent_t{ctx + limit_then_abort<timeout>(1)} {
so_subscribe_selt().event(&simple_limit_demo::on_timeout);
}
virtual void so_evt_start() override {
// Send the first instance of timeout signal.
// This instance will wait in event queue until return from so_evt_start().
so_5::send<timeout>(*this);
// Create a time limited async operation.
so_5::extra::async_op::time_limited::make<timeout>(*this)
.timeout_handler([this](mhood_t<timeout>) {
std::cout << "async_op timeout handler" << std::endl;
so_deregister_agent_coop_normally();
})
.activate(std::chrono::milliseconds(150));
// Block the agent for 500ms.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
};
There is no need to store the definition_point_t
usually. But in some cases, it can be necessary. For example, if a list of completion handler can't be filled at one time. Something like:
namespace asyncop = so_5::extra::async_op::time_unlimited;
class demo : public so_5::agent_t {
asyncop::definition_point_t<> op_ = asyncop::make(*this);
...
void on_preparation_step_1() {
...
op_.completed_on(some_mbox, some_state, some_handler);
...
}
void on_preparation_step_2() {
...
op_.completed_on(another_mbox, another_state, another_handler);
...
}
void initiate_async_op() {
op_.activate(...);
...
}
};
But additional care must be taken here because definition_point_t
will lose all its content after a call to activate
and usage of definition_point_t
after a call to activate
is prohibited (see below).