Skip to content

so5extra 1.6 Async Op

Yauheni Akhotnikau edited this page Apr 21, 2023 · 1 revision

Purpose

The Problem

Sometimes it is necessary to perform one-time interaction between agents in SObjectizer's based application. For example, agent A sends a one-time request to agent B and then want to receive a reply from B. In a simple case agent A expect to receive just one reply message. This can be done like that:

class A : public so_5::agent_t {
   so_5::mbox_t reply_mbox_;
   ...
   void on_reply_handler(mhood_t<reply_msg> cmd) {
      // We do not need this subscription anymore. It should be destroyed.
      so_drop_subscription<reply_msg>(reply_mbox_);
      // Now the reply can be handled.
      ...
   }
   ...
   void initiate_request(const so_5::mbox_t & request_reply_mbox) {
      // Store mbox. It will be needed later to remove the subscription for reply.
      reply_mbox_ = request_reply_mbox;
      // Create a subscription for reply.
      so_subscribe(reply_mbox_).event(&A::on_reply_handler);
      // And now the request can be sent.
      so_5::send<request>(request_reply_mbox, ...);
   }
};

In this very simple example we can see that an user have to do some work to perform one-time interaction. But the amount of work will increase if we take states of an agent into the account:

class A : public so_5::agent_t {
   so_5::mbox_t reply_mbox_;
   ...
   void on_reply_handler_for_state1(mhood_t<reply_msg> cmd) {
      // We do not need subscriptions for the reply anymore. They should be destroyed.
      so_drop_subscription_for_all_states<reply_msg>(reply_mbox_);
      // Now the reply can be handled.
      ...
   }
   void on_reply_handler_for_state2(mhood_t<reply_msg> cmd) {
      // We do not need subscriptions for the reply anymore. They should be destroyed.
      so_drop_subscription_for_all_states<reply_msg>(reply_mbox_);
      // Now the reply can be handled.
      ...
   }
   void on_reply_handler_for_state3(mhood_t<reply_msg> cmd) {
      // We do not need subscriptions for the reply anymore. They should be destroyed.
      so_drop_subscription_for_all_states<reply_msg>(reply_mbox_);
      // Now the reply can be handled.
      ...
   }
   ...
   void initiate_request(const so_5::mbox_t & request_reply_mbox) {
      // Store mbox. It will be needed later to remove the subscription for reply.
      reply_mbox_ = request_reply_mbox;
      // Create subscriptions for reply.
      state1_.event(reply_mbox_, &A::on_reply_handler_for_state1);
      state2_.event(reply_mbox_, &A::on_reply_handler_for_state2);
      state3_.event(reply_mbox_, &A::on_reply_handler_for_state3);
      // And now the request can be sent.
      so_5::send<request>(request_reply_mbox, ...);
   }
};

The situation can be yet more worse if we expect several message types as a reply. For example:

class A : public so_5::agent_t {
   so_5::mbox_t reply_mbox_;
   ...
   void on_successful_reply_handler_for_state1(mhood_t<successful_reply_msg> cmd) {
      // We do not need subscriptions for the reply anymore. They should be destroyed.
      so_drop_subscription_for_all_states<successful_reply_msg>(reply_mbox_);
      so_drop_subscription_for_all_states<failed_reply_msg>(reply_mbox_);
      // Now the reply can be handled.
      ...
   }
   void on_failed_reply_handler_for_state1(mhood_t<failed_reply_msg> cmd) {
      // We do not need subscriptions for the reply anymore. They should be destroyed.
      so_drop_subscription_for_all_states<successful_reply_msg>(reply_mbox_);
      so_drop_subscription_for_all_states<failed_reply_msg>(reply_mbox_);
      // Now the reply can be handled.
      ...
   }
   ... // The same picture for handling successful_* and failed_* reply messages in state2 and state3.
   ...
   void initiate_request(const so_5::mbox_t & request_reply_mbox) {
      // Store mbox. It will be needed later to remove the subscription for reply.
      reply_mbox_ = request_reply_mbox;

      // Create subscriptions for reply.
      state1_.event(reply_mbox_, &A::on_successful_reply_handler_for_state1);
      state1_.event(reply_mbox_, &A::on_failed_reply_handler_for_state1);
      state2_.event(reply_mbox_, &A::on_successful_reply_handler_for_state2);
      state2_.event(reply_mbox_, &A::on_failed_reply_handler_for_state2);
      state3_.event(reply_mbox_, &A::on_successful_reply_handler_for_state3);
      state3_.event(reply_mbox_, &A::on_failed_reply_handler_for_state3);

      // And now the request can be sent.
      so_5::send<request>(request_reply_mbox, ...);
   }
};

The situation will get yet worse if we want to have several parallel one-time operations at the same time. They will use different reply mboxes and we just can't store single reply mbox as member of our agent class. One of the possible solutions for that case can be like that:

class A : public so_5::agent_t {
   ...
   void on_successful_reply_handler_for_state1(
        const so_5::mbox_t & reply_mbox,
        mhood_t<successful_reply_msg> cmd) {
      // We do not need subscriptions for the reply anymore. They should be destroyed.
      so_drop_subscription_for_all_states<successful_reply_msg>(reply_mbox);
      so_drop_subscription_for_all_states<failed_reply_msg>(reply_mbox);
      // Now the reply can be handled.
      ...
   }
   void on_failed_reply_handler_for_state1(
        const so_5::mbox_t & reply_mbox,
        mhood_t<failed_reply_msg> cmd) {
      // We do not need subscriptions for the reply anymore. They should be destroyed.
      so_drop_subscription_for_all_states<successful_reply_msg>(reply_mbox);
      so_drop_subscription_for_all_states<failed_reply_msg>(reply_mbox);
      // Now the reply can be handled.
      ...
   }
   ... // The same picture for handling successful_* and failed_* reply messages in state2 and state3.
   ...
   void initiate_request(const so_5::mbox_t & request_reply_mbox) {
      // Create subscriptions for reply.
      state1_.event(request_reply_mbox,
            [this, request_reply_mbox](mhood_t<successful_reply_msg> cmd) {
                on_successful_reply_handler_for_state1(request_reply_msg, cmd);
            });
      state1_.event(request_reply_mbox,
            [this, request_reply_mbox](mhood_t<failed_reply_msg> cmd) {
                on_failed_reply_handler_for_state1(request_reply_msg, cmd);
            });
      state2_.event(request_reply_mbox,
            [this, request_reply_mbox](mhood_t<successful_reply_msg> cmd) {
                on_successful_reply_handler_for_state2(request_reply_msg, cmd);
            });
      ... // The same for other states and handlers...
      // And now the request can be sent.
      so_5::send<request>(request_reply_mbox, ...);
   }
};

And the situation can be even worse if the reply can be received from different mboxes. Or if there is a timeout for one-time interaction. Or if there are some external factors like the necessity of cancellation of the interaction.

The examples above show that in case of one-time interaction between agents a developer has to write a lot of boilerplate code. It means that one-time interaction can be implemented by the tools provided by the SObjectizer itself, but it requires some work and this work can be boring and error-prone sometimes.

Async Operations As A Solution

Since v.1.0.4 so_5_extra provides a solution for the problem described above. Such one-time interactions are now called async operations and there are helpers for working with async operations.

These helpers are divided into two groups. These groups have a similar API but have slightly different implementations. Because of that, they are placed in different namespaces:

  • so_5::extra::async_op::time_unlimited contains helpers for working with async operations which have no time limit for execution time.
  • so_5::extra::async_op::time_limited contains helpers for async operations with limited execution time.

Just for illustration. One of the examples above can be rewritten that way by using time unlimited async operations:

#include <so_5_extra/async_op/time_unlimited.hpp>
...
class A : public so_5::agent_t {
   ...
   void on_successful_reply_handler_for_state1(mhood_t<successful_reply_msg> cmd) {
      // There is no need to remove subscriptions.
      // The reply can be handled just now.
      ...
   }
   void on_failed_reply_handler_for_state1(mhood_t<failed_reply_msg> cmd) {
      // The reply can be handled just now.
      ...
   }
   ... // The same picture for handling successful_* and failed_* reply messages in state2 and state3.
   ...
   void initiate_request(const so_5::mbox_t & request_reply_mbox) {
      // Create an instance of async operation.
      so_5::extra::async_op::time_unlimited::make(*this)
         // Define event handlers for replies...
         .completed_on(request_reply_mbox, state1_, &A::on_successful_reply_handler_for_state1)
         .completed_on(request_reply_mbox, state1_, &A::on_failed_reply_handler_for_state1)
         .completed_on(request_reply_mbox, state2_, &A::on_successful_reply_handler_for_state2);
         .completed_on(request_reply_mbox, state2_, &A::on_failed_reply_handler_for_state2)
            ... // The same for other states and handlers...
         // And now the operation can be activated.
         .activate([&] {
                // The request can be sent now.
            so_5::send<request>(request_reply_mbox, ...);
         });
   }
};

Time Unlimited Async Operations

An API for working with time unlimited async operations is represented by the following tools from so_5::extra::async_op::time_unlimited namespace:

  • a main class definition_point_t which should be used for definition and activation of an async operation. The most important methods of definition_point_t are completed_on for setting up a completion handler for the async operation and activate for activation of the async operation;
  • a helper class cancellation_point_t which can be used for cancellation of the activated async operation;
  • the main factory function make which should be used for the creation of an instance of definition_point_t.

All of these are defined in so_5_extra/async_op/time_unlimited.hpp header file.

The typical scenario for time unlimited async operations is:

  1. A user calls make factory and receives an instance of the empty definition_point_t class.
  2. The user then calls definition_point_t::completed_on for every event handler he/she wants to have as a completion handler for an async operation.
  3. User then calls definition_point_t::activate for activation of an async operation.

For example:

class demo : public so_5::agent_t {
   ...
   void on_some_msg(mhood_t<some_msg> cmd) {...}
   void on_another_msg(mhood_t<another_msg> cmd) {...}
   ...
   void initiate_async_op() {
      so_5::extra::async_op::time_unlimited::make(*this)
         .completed_on(some_mbox, some_state, &demo::on_some_msg)
         .completed_on(another_mbox, another_state, &demo::on_another_msg)
         .activate([&] {
            ... // Do other async_op-related actions.
         });
      ...
   }
};

In more complex cases there can be additional steps. Thus activate method returns an instance of cancellation_point_t which can be stored and used for cancellation of the async operation. For example:

namespace asyncop = so_5::extra::async_op::time_unlimited;
class demo : public so_5::agent_t {
   // The cancellation point for the async operation.
   asyncop::cancellation_point_t<> cp_;
   ...
   void on_some_msg(mhood_t<some_msg> cmd) {...}
   void on_another_msg(mhood_t<another_msg> cmd) {...}
   ...
   void initiate_async_op() {
      // Define and activate the async_op and store the cancellation_pointer
      // returned by activate() method.
      cp_ = asyncop::time_unlimited::make(*this)
         .completed_on(some_mbox, some_state, &demo::on_some_msg)
         .completed_on(another_mbox, another_state, &demo::on_another_msg)
         .activate([&] {
            ... // Do other async_op-related actions.
         });
      ...
   }
   ...
   void on_reset(mhood_t<reset_signal>) {
      // Async operation should be cancelled now (if it is not completed yet).
      cp_.cancel();
      ... // Some other actions.
   }
};

Time Limited Async Operations

An API for working with time-limited async operations is represented by the following tools from so_5::extra::async_op::time_limited namespace:

  • the main class definition_point_t which should be used for definition and activation of an async operation. The most important methods of definition_point_t are completed_on for setting up a completion handler for the async operation, timeout_handler and default_timeout_handler for setting up timeout handlers, and activate for activation of the async operation;
  • a helper class cancellation_point_t which can be used for cancellation of the activated async operation;
  • the main factory function make which should be used for the creation of an instance of definition_point_t.

All of these are defined in so_5_extra/async_op/time_limited.hpp header file.

The typical scenario for time limited async operations is:

  1. A user calls make factory and receives an instance of empty definition_point_t class.
  2. User calls definition_point_t::completed_on for every event handler he/she want to have as a completion handler for an async operation.
  3. User calls definition_point_t::timeout_handler and definition_point_t::default_timeout_handler for timeout handlers he/she want to have.
  4. User then calls definition_point_t::activate for activation of an async operation.

For example:

class demo : public so_5::agent_t {
   struct timeout final : public so_5::signal_t {};
   ...
   void on_some_msg(mhood_t<some_msg> cmd) {...}
   void on_another_msg(mhood_t<another_msg> cmd) {...}
   ...
   void on_timeout(mhood_t<timeout>) {...}
   ...
   void initiate_async_op() {
      so_5::extra::async_op::time_limited::make<timeout>(*this)
         .completed_on(some_mbox, some_state, &demo::on_some_msg)
         .completed_on(another_mbox, another_state, &demo::on_another_msg)
         .timeout_handler(some_state, &demo::on_timeout)
         .activate(std::chrono::milliseconds(350));

         ... // Do other async_op-related actions.
      ...
   }
};

In more complex cases there can be additional steps. Thus activate method returns an instance of cancellation_point_t which can be stored and used for cancellation of the async operation. For example:

namespace asyncop = so_5::extra::async_op::time_limited;
class demo : public so_5::agent_t {
   // The cancellation point for the async operation.
   asyncop::cancellation_point_t<> cp_;
   ...
   void on_some_msg(mhood_t<some_msg> cmd) {...}
   void on_another_msg(mhood_t<another_msg> cmd) {...}
   ...
   void initiate_async_op() {
      // Define and activate the async_op and store the cancellation_pointer
      // returned by activate() method.
      cp_ = asyncop::time_limited::make<timeout>(*this)
         .completed_on(some_mbox, some_state, &demo::on_some_msg)
         .completed_on(another_mbox, another_state, &demo::on_another_msg)
         .activate(std::chrono::milliseconds(350));

      ... // Do other async_op-related actions.
      ...
   }
   ...
   void on_reset(mhood_t<reset_signal>) {
      // Async operation should be cancelled now (if it is not completed yet).
      cp_.cancel();
      ... // Some other actions.
   }
};

If an agent has several states it can be desired to handle timeout in every state of the agent. A user can call timeout_handler for every state, for example:

so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
   .timeout_handler(state1, some_timeout_handler)
   .timeout_handler(state2, some_timeout_handler)
   .timeout_handler(state3, some_timeout_handler)
   ... // and so on...

Or default_timeout_handler can be used:

so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
   .default_timeout_handler(some_timeout_handler)
   ... // other tuning stuff...

The calls to timeout_handler and default_timeout_handler can be combined. For example in the following case first_handler will be called if the agent receives timeout_msg in the state st_first, second_handler will be called if the agent receives timeout_msg in the state st_second, and third_handler will be called if the agent is in any of other states:

so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
   .timeout_handler(st_first, first_handler)
   .timeout_handler(st_second, second_handler)
   .default_timeout_handler(third_handler)
    .activate(std::chrono::milliseconds(350));

The examples above use a signal as a timeout message. But ordinary messages with some data inside can be used with time limited operations too. Arguments for the timeout message's constructors are passed as additional arguments for activate method:

class request_manager : public so_5::agent_t {
    struct request_timed_out final : public so_5::message_t {
        user_id user_;
        request_id req_id_;
        request_timed_out(user_id user, request_id id)
            : user_{std::move(user)), req_id_{std::move(id)}
        {}
    };
    ...
    void on_timeout(mhood_t<request_timed_out> cmd) {
        ...
    }
    ...
    void schedule_user_request(
        const user_id & user,
        const request_id & id,
        request_data req) {
        // Create and tune async_op for that request.
        so_5::extra::async_op::time_limited::make<request_timed_out>(*this)
            .completed_on(...)
            .default_timeout_handler(&request_manager::on_timeout)
            .activate(
                // Max operation duration.
                std::chrono::seconds(5),
                // Arguments for constructing instance of request_timed_out.
                user, id);
      ...
   }
};

More Technical Details About Async Operations

How Async Operations Work

How Time Unlimited Async Operations Work

The time unlimited async operation works that way:

When a user calls make an empty definition point object is created. There is a special operation data object inside the definition point (the definition point holds a reference to that data objects). This data object holds an empty list of completion handlers.

When a user calls completed_on method of definition point the list of completion handlers is filled up.

When a user calls activate method of definition point several actions are performed:

  • all completion handlers from the completion handlers list are subscribed.
  • the definition point becomes empty. It means that the definition point loses its reference to the operation data object.

When any of completion messages/signals arrived the status of the operation is checked. If the operation is not canceled yet then the operation will be marked as completed. A corresponding completion handler will be called.

A user can cancel the operation. If the operation is not completed yet then the operation will be marked as canceled.

When the status of the started operation is being changed (e.g. when the operation is marked as completed or canceled) all completion handlers are unsubscribed.

Note. When any of handlers is called the status is already changed. It means that when a completion handler is called the current status of the async operation is completed. For example:

namespace asyncop = so_5::extra::async_op
class completion_demo : public so_5::agent_t {
   asyncop::cancellation_point_t<> cp_;

   struct boom final : public so_5::signal_t {};

public:
   completion_demo(context_t ctx) : so_5::agent_t{std::move(ctx)} {}

   virtual void so_evt_start() override {
      cp_ = asyncop::make(*this)
         .completed_on(so_direct_mbox(), so_default_state(),
            [this](mhood_t<boom>) {
               assert(asyncop::status_t::completed == cp_.status());
               so_deregister_agent_coop_normally();
            })
         .activate([this] { so_5::send<boom>(*this); });
   }
};

From the technical point of view completed_on registers a special surrogate event handler. It means that in the following case:

so_5::extra::async_op::time_unlimited::make(*this)
    .completed_on(mbox, state, demo::completion_handler)
    ...

at the moment of activation the lambda-function like this will be subscribed instead of subscription of demo::completion_handler method:

so_subscribe(mbox).in(state).event(
    [this, async_op__ = get_internal_async_op_data()](auto cmd) {
      async_op__->completed(); // All surrogate completion handlers will be unsubscribed.
      this->completion_handler(cmd);
   });

How Time Limited Async Operations Work

The time limited async operation works that way:

When a user calls make an empty definition point object is created. There is a special operation data object inside the definition point (the definition point holds a reference to that data objects). This data object holds two empty lists. The first one is the list of completion handlers. The second -- is the list of timeout handlers.

When a user calls completed_on method of definition point the list of completion handlers is filled up.

When a user calls timeout_handler method of definition point the list of timeout handlers is filled up.

When a user calls default_timeout_handler method the handler is stored inside the operation data object. If there already was the default timeout handler the old one will be replaced by the new one.

When a user calls activate method of definition point several actions are performed:

  • if there is no the default timeout handler then a special default timeout handler is created.
  • all completion handlers from the completion handlers list are subscribed.
  • all timeout handlers from the list of timeout handlers are subscribed. The default timeout handler is also subscribed.
  • the definition point becomes empty. It means that the definition point loses its reference to the operation data object.
  • a delayed timeout message or signal is sent.

When any of completion messages/signals arrived the status of the operation is checked. If the operation is not timed out nor canceled yet then the operation will be marked as completed. A corresponding completion handler will be called.

When a timeout message or signal arrives but the async operation is not completed yet then the operation will be marked as timed out. If there is a timeout handler for the current agent state or there is an user-defined default timeout handler then the appropriate timeout handler will be called.

A user can cancel the operation. If the operation is not completed nor timed out yet then the operation will be marked as canceled.

When the status of the started operation is being changed (e.g. when the operation is marked as completed, timed out or canceled) the following actions are performed:

  • all completion handlers are unsubscribed;
  • all timeout handlers (including the default one) are unsubscribed;
  • a delayed timeout message or signal is canceled.

Note. When any of handlers is called the status is already changed. It means that when a completion handler is called the current status of the async operation is completed. Similarly, when a timeout handler is called the current status of the async operation is timedout.

From the technical point of view completed_on registers a special surrogate event handler. It means that in the following case:

so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
    .completed_on(mbox, state, demo::completion_handler)
    ...

at the moment of activation the lambda-function like this will be subscribed instead of subscription of demo::completion_handler method:

so_subscribe(mbox).in(state).event(
    [this, async_op__ = get_internal_async_op_data()](auto cmd) {
      async_op__->completed();
      this->completion_handler(cmd);
   });

The same trick is applied when a timeout handler is added by calling timeout_handler or default_timeout_handler. So the code:

so_5::extra::async_op::time_limited::make<timeout_msg>(*this)
   .timeout_handler(state, demo::timeout_handler);

will lead to something like that at the moment of activation:

so_subscribe(get_special_async_op_mbox()).in(state).event(
    [this, async_op__ = get_internal_async_op_data()](auto cmd) {
      async_op__->timeout();
      this->timeout_handler(cmd);
   });

Exception Safety

Methods completed_on, timeout_handler and default_timeout_handler, reserve_completion_handlers_capacity, reserve_timeout_handlers_capacity of definition_point_t classes provide strong exception safety guarantee. It means that either modification of definition point is performed successfully or there won't be modification at all in the case of an exception.

Methods activate of definition_point_t provides less strict guarantee. If an exception is thrown during the activation of operation then all changes (like subscriptions for completion handlers) will be rolled back. In this sense, there will be a strong guarantee. But the content of the definition point will be lost (it means that definition_point_t will become empty). In this sense, there will be only a basic guarantee.

Methods status, is_cancellable, cancel and cleanup of cancellation_point_t provide no exception guarantee. They are marked as noexcept in C++ code.

Difference Between Two Versions Of Activate For Time Unlimited Operations

There are two versions of definition_point_t::activate for time unlimited async operations. The first form:

so_5::extra::async_op::time_unlimited::make(*this)
   .completed_on(...)
   .completed_on(...)
   ... // other completion handlers.
   .activate(); // no arguments for activate.
do_some_other_activation_actions(); // (1)

And the second one:

so_5::extra::async_op::time_unlimited::make(*this)
   .completed_on(...)
   .completed_on(...)
   ... // other completion handlers.
   .activate( // lambda-function as an argument
      [&] {
         do_some_other_activation_actions(); // (2)
      });

The difference between these two forms is related to exception safety. Let assume that do_some_other_activation_actions throws. If the first form of activation is used then the async operation will still be active after an exception from do_some_other_activation_actions. It means that all subscriptions created inside activate will be there. If a user doesn't want to have these subscriptions after an exception in point (1) then he/she should cancel the async operation by hand. For example:

// Cancellation point is required.
auto cp = so_5::extra::async_op::time_unlimited::make(*this)
   .completed_on(...)
   .completed_on(...)
   ... // other completion handlers.
   .activate(); // no arguments for activate.
try {
   do_some_other_activation_actions();
}
catch(...) {
   cp.cancel();
   throw;
}

If the second form of activate is used then all activation actions (like subscriptions of completion handlers) will be automatically rolled back if do_some_other_activation_actions() throws at point (2).

Note that definition_point_t for time limited has just one form of activate method. It is similar to the first form of time_unlimited::definition_point_t::activate.

When Operation Data Is Destroyed?

All async operation data is stored inside a special dynamically allocated operation data object. This data object is created when a factory function make is called. A definition_point_t instance holds a reference to that data object.

During the activation procedure, a definition point loses its reference to the operation data object. This reference is transferred to cancellation_point_t instance. Also, the reference is present in surrogate handlers which are registered for completion and timeout handlers.

If a user does not store cancellation_point_t then references to the operation data object will be stored only inside surrogate handlers. When the operation completes and all these surrogate handlers will be destroyed then there won't be any live reference to the operation data object and this object will be deallocated automatically.

It a user stores cancellation_point_t then the operation data object will live after the completion of the operation until this cancellation point will be destroyed. Or until the user calls cancellation_point_t::cleanup method.

Note that call to cancellation_point_t::cleanup doesn't affect the operation if it is not completed (time out or canceled) yet. The call to cleanup just removes the reference to the operation data object from cancellation_point_t instance. But the operation itself will live until it will be completed or timed out.

Message Limits And Time Limited Async Operations

A special mbox is created for time-limited async operation. This mbox is used for timeout message/signal which is sent as delayed message in definition_point_t::activate method.

The trick is that this mbox is limitless one. It means that message limits are not taken into account when a message is being sent to that mbox. It can lead to the case when a message limit specified by a user will be ignored.

For example, in a normal situation this example will lead to abortion of an application:

class simple_limit_demo final : public so_5::agent_t {
   struct timeout final : public so_5::signal_t {};
   void on_timeout(mhood_t<timeout>) {}
public:
   simple_limit_demo(context_t ctx)
      : so_5::agent_t{ctx + limit_then_abort<timeout>(1)} {
      so_subscribe_selt().event(&simple_limit_demo::on_timeout);
   }
   virtual void so_evt_start() override {
      // Send the first instance of timeout signal.
      // This instance will wait in event queue until return from so_evt_start().
      so_5::send<timeout>(*this);
      // Delayed send of second instance of timeout signal.
      // The actual delivery attempt will be after 150ms.
      so_5::send_delayed<timeout>(*this, std::chrono::milliseconds(150));

      // Block the agent for 500ms.
      std::this_thread::sleep_for(std::chrono::milliseconds(500));
   }
};

This example will be aborted after 150ms when an attempt to deliver the second instance of timeout signal occurs.

But the message limit will be ignored in the case of time limited operation. For example:

class simple_limit_demo final : public so_5::agent_t {
   struct timeout final : public so_5::signal_t {};
   void on_timeout(mhood_t<timeout>) {
      std::cout << "ordinary timeout handler" << std::endl;
}
public:
   simple_limit_demo(context_t ctx)
      : so_5::agent_t{ctx + limit_then_abort<timeout>(1)} {
      so_subscribe_selt().event(&simple_limit_demo::on_timeout);
   }
   virtual void so_evt_start() override {
      // Send the first instance of timeout signal.
      // This instance will wait in event queue until return from so_evt_start().
      so_5::send<timeout>(*this);

      // Create a time limited async operation.
      so_5::extra::async_op::time_limited::make<timeout>(*this)
         .timeout_handler([this](mhood_t<timeout>) {
            std::cout << "async_op timeout handler" << std::endl;
            so_deregister_agent_coop_normally();
         })
         .activate(std::chrono::milliseconds(150));

      // Block the agent for 500ms.
      std::this_thread::sleep_for(std::chrono::milliseconds(500));
   }
};

How To Store A Definition Point

There is no need to store the definition_point_t usually. But in some cases, it can be necessary. For example, if a list of completion handler can't be filled at one time. Something like:

namespace asyncop = so_5::extra::async_op::time_unlimited;
class demo : public so_5::agent_t {
   asyncop::definition_point_t<> op_ = asyncop::make(*this);
   ...
   void on_preparation_step_1() {
      ...
      op_.completed_on(some_mbox, some_state, some_handler);
      ...
   }
   void on_preparation_step_2() {
      ...
      op_.completed_on(another_mbox, another_state, another_handler);
      ...
   }
   void initiate_async_op() {
      op_.activate(...);
      ...
   }
};

But additional care must be taken here because definition_point_t will lose all its content after a call to activate and usage of definition_point_t after a call to activate is prohibited (see below).

Clone this wiki locally