Skip to content

so5extra 1.4 Round Robin Mbox

Yauheni Akhotnikau edited this page Jan 15, 2020 · 1 revision

The Problem

Sometimes it is necessary to distribute processing of a message stream between several agents. Several approaches can be used for solving this task. For example the usage of a pair of Collector and Performer agents.

Custom solutions which are based on Collector/Performer agents have many advantages. But there is a major drawback: it requires some amount of work from a developer. Even in cases were simple schemes like round-robin message distribution is appropriate.

The Solution

so_5_extra provides a simple solution for a task for message stream processing distribution: a round-robin mbox.

Round-robin mbox is a special mbox with several agents behind it. When a new message arrives the round-robin mbox delivers it to only one receiver in a round-robin manner. When the next message arrives the mbox delivers it to the next receiver and so on.

There is no need to write the message-distribution code. It is necessary to create an instance of round-robin mbox, make a subscription to it and that all.

Usage Example

A very simple usage for distributing messages via round-round mbox:

#include <so_5_extra/mboxes/round_robin.hpp>

#include <so_5/all.hpp>

// A simple worker for handling messages from round-robin mbox.
class worker_t final : public so_5::agent_t
{
public :
   worker_t(
      // Environment to work in.
      context_t ctx,
      // Name of worker.
      std::string name,
      // Round-robin mbox.
      const so_5::mbox_t & src )
      :  so_5::agent_t( std::move(ctx) )
      ,  m_name( std::move(name) )
   {
      // Subscribe worker to messages from round-robin mbox.
      so_subscribe( src ).event( &worker_t::on_task );
   }

private :
   const std::string m_name;

   void on_task( mhood_t<std::string> cmd )
   {
      std::cout << m_name << ": " << *cmd << std::endl;
      so_deregister_agent_coop_normally();
   }
};

int main()
{
   so_5::launch( []( so_5::environment_t & env ) {
      // Create round-robin mbox.
      const auto rrmbox = so_5::extra::mboxes::round_robin::make_mbox<>( env );

      // Create several workers. All of them will be subscribed to
      // that round-robin mbox.
      for( int i = 0; i < 3; ++i )
         env.introduce_coop( [&]( so_5::coop_t & coop ) {
            coop.make_agent< worker_t >(
                  "worker-" + std::to_string( i + 1 ),
                  std::cref(rrmbox) );
         } );

      // Send several messages to be handled by workers.
      so_5::send< std::string >( rrmbox, "Alpha" );
      so_5::send< std::string >( rrmbox, "Beta" );
      so_5::send< std::string >( rrmbox, "Gamma" );
   } );

   return 0;
}

This example will output something like:

worker-1: Alpha
worker-2: Beta
worker-3: Gamma

Round-Robin Mbox Working Principle

Round-robin mbox implements multi-producer/single-consumer mbox. It means that a message will be delivered to at most one subscriber. It also means that mutable messages can be sent to round-robin mbox just as to any other multi-producer/single-consumer mboxes.

Round-robin mbox holds lists of subscribers for every message type. It means that if agents A1, A2, and A3 are subscribed to message type M1 and A2 and A4 are subscribed to M2 then there are two subscription lists inside the mbox: one (A1, A2, A3) for M1 and second (A2, A4) for M2. These lists are handled separately.

Round-robin mbox remembers the current item for every subscription list.

When someone sends a message to a round-robin mbox that mbox looks for subscription list for that message type. If such list is found the message is delivered to the current item in the list and the current item is changed.

For example, let assume that there is a round-robin mbox with subscription list (A1, A2, A3) for message type M1 and the current item in that list is A1. When a message of type M1 is sent to that mbox then the message is delivered to A1. The new current item in the subscription list is A2 now. The next message of type M1 will be delivered to A2 and the new current item in the subscription list will become A3. Then the next message of type M1 will be delivered to A3 and the new current item in the subscription list will become A1.

If there is no a subscription list for a particular message type that message won't be delivered and will be just dropped.

Round-robin mbox correctly handles unsubscriptions: the agent to be unsubscribed is removed from the subscription list for the specific message type (the current item in this list is changed if needed). If subscription list becomes empty after unsubscription then that subscription list will be removed. A new message of that type won't be delivered until someone will be subscribed to it.

Because round-robin mbox works like multi-producer/single-consumer mbox it is possible to make synchronous interaction between agents via round-robin mboxes.

Round-Robin Mbox and Delivery Filters

Round-robin mbox doesn't support message delivery filters. It means that an attempt to set up a delivery filter to a round-robin mbox will lead to an exception.

make_mbox Template Function

make_mbox function which is used for creation of round-robin mbox instance is a template function with the following signature:

template< typename Lock_Type = std::mutex >
mbox_t
make_mbox( environment_t & env );

It has a single template parameter: Lock_Type. This parameter specifies a type of lock object to protect Shutdowner's internals in a multithreaded application. By default, this type is std::mutex. But it can be changed to something that more appropriate for the user's environment.

For example, a custom implementation of spinlock can be used as Lock_Type. Or it can so_5::null_mutex_t for single-threaded environments.

Clone this wiki locally