Skip to content

so5extra 1.5 First Last Subscriber Notify Mbox

eao197 edited this page Jun 8, 2022 · 11 revisions

The Problem

Suppose we have an agent that gathers and processes some data from an external source. The data processing is a heavy task as we want to avoid spending resources when no one receives the processed data at the moment.

An obvious way to do that is to force data consumers to inform about arrival and leaving. For example, a consumer have to send msg_register_consumer when the consumer starts and msg_deregister_consumer when the consumer exits. The data processor has to receive msg_register_consumer and msg_deregister_consumer and hold a list of live consumers. When new data is ready, the data processor sends a message with new data to every registered consumer.

It's a working scheme, but it has some drawbacks:

  • it makes the producer more complex because it has to hold a list of live consumers;
  • it makes consumer more complex because they have to make a subscription to a message with data, but also they have to send two additional messages.

The Solution

A new mbox added in v1.5.2 provides another solution to that problem. It's first-last-subscriber-notification mbox that sends:

  • msg_first_subscriber signal when the first subscriber arrives;
  • msg_last_subscriber signal when the last subscriber leaves.

It means that consumers only have to subscribe to a mbox. The corresponding notifications will be automatically sent to data producer.

It also means that the data producer has no need to hold a list of subscriber. An instance of first-last-subscriber-notification mbox can be used as multi-producer/multi-consumer mbox: a message sent to that mbox will be automatically delivered to every subscriber.

Usage Example

Very simple usage of first-last-subscriber-notification mbox can look like this:

#include <so_5_extra/mboxes/first_last_subscriber_notification.hpp>

#include <so_5/all.hpp>

namespace notifications_ns = so_5::extra::mboxes::first_last_subscriber_notification;

//
// Data producer part.
//

// Message to be used for data spreading.
struct msg_data final : public so_5::message_t {...};

// Agent that produces the data.
class data_producer final : public so_5::agent_t
{
  // State in that the agent waits the first consumer.
  state_t st_wait_consumers{this};
  // State in that the agent produces data.
  state_t st_consumers_connected{this};

  // Mbox for data spreading. Will be created in the constructor.
  const so_5::mbox_t data_mbox_;

public:
  data_producer(context_t & ctx)
    : so_5::agent_t{std::move(ctx)}
    , data_mbox_{ // A new mbox has to be created.
        // New mbox will be used for msg_data messages...
        notifications_ns::make_mbox<msg_data>(
          so_environment(),
          // Agent's direct mbox for notifications.
          so_direct_mbox(),
          // This is MPMC mbox.
          so_5::mbox_type_t::multi_producer_multi_consumer )
      }
  {}

  // Access to data mbox.
  [[nodiscard]]
  const so_5::mbox_t &
  data_mbox() const noexcept { return data_mbox_; }

  void so_define_agent() override
  {
    st_wait_consumers
      .event([this](mhood_t<notifications_ns::msg_first_subscriber>) {
          st_consumers_connected.activate();
        });

    st_consumers_connected
      .on_enter([]{ ... /* turn data acquisition on */ })
      .on_exit([]{ ... /* turn data acquisition off */ })
      .event([this](mhood_t<notifications_ns::msg_last_subscriber>) {
          st_no_consumers.activate();
        });
  }
  ...

private:
  ...
  void on_data_ready()
  {
    // Sending a new portion of data.
    so_5::send<msg_data>(data_mbox_, ...);
  }
};

//
// Data consumer part
//

class data_consumer final : public so_5::agent_t
{
  const so_5::mbox_t data_mbox_;

public:
  data_consumer(context_t ctx, so_5::mbox_t data_mbox)
    : so_5::agent_t{std::move(ctx)}
    , data_mbox_{std::move(data_mbox)}
  {}

  void so_define_agent() override
  {
    // It's necessary to subscribe only.
    so_subscribe(data_mbox_).event([](mhood_t<msg_data> cmd) { ... });
  }
  ...
};

First-Last-Subscriber-Notification Mbox is Bound to a Single Message Type

An instance of first-last-subscriber-notification mbox can only be used with just one message type. That type is specified as template parameter to make_mbox factory function. An attempt to send a message of a different type will lead to an exception from send function:

using namespace so_5::extra::mboxes::first_last_subscriber_notification;
auto data_mbox = make_mbox<my_message>(
  env, notification_mbox, so_5::mbox_type_t::multi_producer_multi_consumer);

// OK.
so_5::send<my_message>(data_mbox, ...);

// Error. A message of different type.
so_5::send<another_message>(data_mbox, ...);

// Error. Mutable my_message is treated as a different type.
so_5::send< so_5::mutable_msg<my_message> >(data_mbox, ...); 

Note that the type checking is performed at run-time (it can't be applied at compile-time), so additional care should be taken when a first-last-subscriber-notification mbox is used for delayed/periodic messages. For example, this code will lead to the termination of the whole application:

using namespace so_5::extra::mboxes::first_last_subscriber_notification;
auto data_mbox = make_mbox<my_message>(...);

so_5::send_delayed<another_message>(data_mbox, 100ms, ...); // (1)

The call to send_delayed successes at (1), because there wouldn't an actual delivering of the message to the target mbox. The delivering will be applied some time later at the context of SObjectizer's timer thread. And exception thrown in that context will terminate the whole application because delivery-related exceptions are prohibited here.

If a mutable message has to be sent to first-last-subscriber-notification mbox then the mbox should be created for mutable message:

using namespace so_5::extra::mboxes::first_last_subscriber_notification;
auto data_mbox = make_mbox< so_5::mutable_msg<my_message> >(
  env, notification_mbox,
  // NOTE: it's MPSC mbox, mutable messages can't be sent to MPMC mboxes.
  so_5::mbox_type_t::multi_producer_single_consumer);

First-Last-Subscriber-Notification Mbox as MPMC or MPSC mbox

An instance of first-last-subscriber-notification mbox can be created as MPMC or MPSC mbox. It's specified by a parameter to make_mbox function.

If a mbox is created as MPMC mbox then multiple consumers can make simultaneous subscriptions, but mutable messages can't be sent to such mbox.

If a mbox is created as MPSC mbox then only one consumer can make a subscription (or set a delivery filter) and mutable messages are enabled.

using namespace so_5::extra::mboxes::first_last_subscriber_notification;

auto mpmc_mbox = make_mbox<my_message>(
  env, notification_mbox, so_5::mbox_type_t::multi_producer_multi_consumer);

auto mpsc_mbox_for_immutable = make_mbox<my_message>(
  env, notification_mbox, so_5::mbox_type_t::multi_producer_single_consumer);
auto mpsc_mbox_for_mutable = make_mbox< so_5::mutable_msg<my_message> >(
  env, notification_mbox, so_5::mbox_type_t::mutil_producer_single_consumer);

First-Last-Subscriber-Notification Mbox and Delivery Filters

Delivery filters are supported, even if first-last-subscriber-notification mbox is used as MPSC mbox.

Please note, that so_5::agent_t::so_set_delivery_filter_for_mutable_msg has to be used for filtering mutable messages.

make_multi_consumer_mbox, make_single_consumer_mbox, and make_mbox Template Functions

There are two main functions for the creation of first-last-subscriber-notification mbox:

// Creates a multi-producer/multi-consumer mbox.
template< typename Msg_Type, typename Lock_Type = std::mutex >
[[nodiscard]] mbox_t
make_multi_consumer_mbox(
   environment_t & env,
   const so_5::mbox_t & notification_mbox );

// Creates a multi-producer/single-consumer mbox.
template< typename Msg_Type, typename Lock_Type = std::mutex >
[[nodiscard]] mbox_t
make_single_consumer_mbox(
   environment_t & env,
   const so_5::mbox_t & notification_mbox );

Both are just thin wrappers around make_mbox function that is actually used for creation of first-last-subscriber-notification mbox instance. It is also a template function with the following signature:

template< typename Msg_Type, typename Lock_Type = std::mutex >
[[nodiscard]] mbox_t
make_mbox(
   environment_t & env,
   const so_5::mbox_t & notification_mbox,
   so_5::mbox_type_t mbox_type );

Template parameter Msg_Type specifies a message type to be used with the created mbox. Please note that if a first-last-subscriber-notification mbox is being created for mutable message then so_5::mutable_msg should be used, for example:

using namespace so_5::extra::mboxes::first_last_subscriber_notification;

auto my_mpsc_mbox = make_mbox< so_5::mutable_msg<my_msg> >(...);

Template parameter Lock_Type specifies a type of lock object to protect mbox's internals in a multithreaded application. By default, this type is std::mutex. But it can be changed to something that more appropriate for the user's environment.

For example, a custom implementation of spinlock can be used as Lock_Type. Or it can so_5::null_mutex_t for single-threaded environments.

Clone this wiki locally