-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.6 Msg Hierarchy
The problem was described several years ago in the issues #24 and #25: SObjectizer lacks a possibility to receive a message by its base class.
For example, let's suppose we have something like:
struct basic_image : public so_5::message_t {...};
struct camera_vendor_1_image : public basic_image {...};
struct camera_vendor_2_image : public basic_image {...};
struct image_type_one : public camera_vendor_1_image {...};
struct image_type_two : public camera_vendor_1_image {...};
...
And we want to write an agent that handles messages of type camera_vendor_1_image
. It means that such an agent can receive messages of type image_type_one
and image_type_two
.
Or an agent that handles messages of type basic_image
-- and it means that such an agent can handle any descendant of basic_image
.
All the functionality described below is in the so_5::extra::msg_hierarchy
namespace.
It's required to include pub.hpp
header file from so_5_extra/msg_hierarchy
:
#include <so_5_extra/msg_hierarchy/pub.hpp>
The first thing a user has to do is to describe message hierarchy by using two special template classes:
// The root of the hierarchy has to be derived from a single root_t class.
struct basic_image : public so_5::extra::msg_hierarchy::root_t<basic_image> {...};
// All other levels in the hierarchy have to use inheritance from
// two base classes. The first is an actual base class, but the second is
// a special empty mixin node_t.
struct camera_vendor_1_image
: public basic_image
, public so_5::extra::msg_hierarchy::node_t<camera_vendor_1_image, basic_image>
{...};
struct camera_vendor_2_image
: public basic_image
, public so_5::extra::msg_hierarchy::node_t<camera_vendor_2_image, basic_image>
{...};
struct image_type_one
: public camera_vendor_1_image
, public so_5::extra::msg_hierarchy::node_t<image_type_one, camera_vendor_1_image>
{...};
struct image_type_two
: public camera_vendor_1_image
, public so_5::extra::msg_hierarchy::node_t<image_type_two, camera_vendor_1_image>
{...};
...
It's required to call the constructor of node_t
explicitly:
struct basic_image : public so_5::extra::msg_hierarchy::root_t<basic_image>
{
// There is no need to call the constructor of root_t.
basic_image() = default;
};
struct camera_vendor_1_image
: public basic_image
, public so_5::extra::msg_hierarchy::node_t<camera_vendor_1_image, basic_image>
{
// A reference to `this` has to be passed to the node_t's constructor.
camera_vendor_1_image()
: so_5::extra::msg_hierarchy::node_t<camera_vendor_1_image, basic_image>{ *this }
{}
};
struct camera_vendor_2_image
: public basic_image
, public so_5::extra::msg_hierarchy::node_t<camera_vendor_2_image, basic_image>
{
// A reference to `this` has to be passed to the node_t's constructor.
camera_vendor_2_image()
: so_5::extra::msg_hierarchy::node_t<camera_vendor_2_image, basic_image>{ *this }
{}
};
Fortunately the compiler complies when the constructor of node_t
isn't called so it's impossible to forget.
The next step is the creation of a special machinery that will demultiplex messages between subscribers. The proxy to this machinery is a demuxer_t<Root_Msg>
template class:
so_5::extra::msg_hierarchy::demuxer_t<basic_image> demuxer{ ... };
...
An instance of demuxer_t
can be seen as a proxy object that hides a special demuxing-controller and a bunch of mboxes. Only mboxes created by this demuxer_t
instance can be used for delivery of messages for a hierarchy started from Root_Msg
type.
To send a message a special sending_mbox has to be obtained from the demuxer via sending_mbox
method:
class camera_vendor_1_handler final : public so_5::agent_t
{
// Destination for outgoing messages.
const so_5::mbox_t m_out_mbox;
...
public:
camera_vendor_1_handler(
context_t ctx,
so_5::extra::msg_hierarchy::demuxer_t<basic_image> & demuxer,
...)
: so_5::agent_t{ std::move(ctx) }
, m_out_mbox{ demuxer.sending_mbox() }
, ...
{}
...
void process_next_frame()
{
... // Obtaining a new frame.
switch(check_frame_type())
{
case first_type:
so_5::send<image_type_one>(m_out_mbox, ...);
break;
case second_type:
so_5::send<image_type_two>(m_out_mbox, ...);
break;
}
}
};
The sending_mbox created by a demuxer can be multi-producer/multi-consumer or multi-producer/single-consumer. A multi-consumer mbox prohibits delivery of mutable messages.
Type of sending_mbox is specified in the constructor of a demuxer and can't be changed later:
// Demuxer with MPMC sending_mbox.
so_5::extra::msg_hierarchy::demuxer_t<basic_image> mpmc_demuxer{
env,
so_5::extra::msg_hierarchy::multi_consumer};
// This is OK:
so_5::send<image_type_one>(mpmc_demuxer.sending_mbox(), ...);
// This is NOT OK and will lead to an exception:
so_5::send< so_5::mutable_msg<image_type_one> >(mpmc_demuxer.sending_mbox(), ...);
// Demuxer with MPSC sending_mbox.
so_5::extra::msg_hierarchy::demuxer_t<basic_image> mpsc_demuxer{
env,
so_5::extra::msg_hierarchy::single_consumer};
// This is OK:
so_5::send<image_type_one>(mpsc_demuxer.sending_mbox(), ...);
// This is OK too:
so_5::send< so_5::mutable_msg<image_type_one> >(mpsc_demuxer.sending_mbox(), ...);
To receive messages from a hierarchy it's required to do:
- obtain an instance of
consumer_t<Root_Msg>
type; - obtain a special receiving_mbox from a consumer object;
- make a subscription from a receiving_mbox.
Usually it looks like that:
class message_processor final : public so_5::agent_t
{
// Consumer instance.
so_5::extra::msg_hierarchy::consumer_t<basic_image> m_consumer;
...
public:
message_processor(
context_t ctx,
so_5::extra::msg_hierarchy::demuxer_t<basic_image> & demuxer,
...)
: so_5::agent_t{ std::move(ctx) }
// Create an instance of a consumer.
, m_consumer{ demuxer.allocate_consumer() }
, ...
{}
void so_define_agent() override
{
// A separate receiving_mbox for every type of messages to be received.
so_subscribe(m_consumer.receiving_mbox<image_type_one>())
.event([this](mhood_t<image_type_one> cmd) {...});
so_subscribe(m_consumer.receiving_mbox<camera_vendor_1_image>())
.event([this](mhood_t<camera_vendor_1_image> cmd) {...});
so_subscribe(m_consumer.receiving_mbox<basic_image>())
.event([this](mhood_t<basic_image> cmd) {...});
...
}
};
The main trick is to use a separate receiving_mbox for every type of message a user wants to receive.
This approach contradicts with the usual way of receiving messages from mbox in SObjectizer: when we work with "normal" messages we can subscribe to messages of different types from a single mbox:
void some_agent::so_define_agent() {
// All these messages can be received from just one mbox:
const auto src = so_environment().create_mbox("my_mbox");
so_subscribe(src)
.event([this](const first_message_type &) {...})
.event([this](const second_message_type &) {...})
.event([this](const third_message_type &) {...})
...;
}
but with msg_hierarchy we have to use a separate mbox for every message type in the hierarchy:
void some_agent::so_define_agent() {
so_subscribe(
// A separate mbox for receiving messages of type basic_image.
m_consumer.receiving_mbox<basic_image>())
.event([this](const basic_image & cmd) {...});
so_subscribe(
// A separate mbox for receiving messages of type camera_vendor_1_image.
m_consumer.receiving_mbox<camera_vendor_1_image>())
.event([this](const camera_vendor_1_image & cmd) {...});
so_subscribe(
// A separate mbox for receiving messages of type image_type_one.
m_consumer.receiving_mbox<image_type_one>())
.event([this](const image_type_one & cmd) {...});
...
}
When a user asks for a separate mbox for a message of type A the demuxing-controller (hidden behind demuxer and consumer objects) checks the type A and creates special relations between sending_mbox and a particular receiver. Because of such relation the demuxer-controller knows destinations for message delivery when a message of type A (or any other type derived from A) is being sent to the sending_mbox.
A consumer object plays a very important role and solves two tasks.
The first task is avoiding delivering the same message instance to one subscriber several times.
All receiving_mboxes are bound to corresponding consumer objects and this protects from repeated delivery of a message to a subscriber. Let's suppose that one agent has subscriptions to message types image_type_one
, camera_vendor_1_image
and basic_image
. When an instance of image_type_one
is sent it can be delivered to subscribers of message type image_type_one
, and also for subscribers of message type camera_vendor_1_image
, and also for subscribers of message type basic_image
. If one agent has subscriptions for all of those types then it may receive the message three times. But it is obviously not what we want.
When receiving_mboxes are bound to a consumer object then the demuxing-controller understands that there is just one subscriber behind those mboxes. It allows the demuxing-controller to stop delivering a message to this subscriber just after the first receiving_mbox is found.
The second task is the deactivation of receiving mboxes when the consumer object is destroyed.
When a receiving_mbox is created for a consumer and subscription is made then there are two owners of this receiving_mbox: one is the demuxer-controller object that performs message delivery, and another is the agent that created the subscription.
Unfortunately, it's hard to detect a moment when the agent drops its subscriptions and doesn't need receiving_mbox anymore. It's possible, but requires cyclic references between entities and this provokes memory leaks. Because of that msg_hierarchy used a simpler way: the destructor of the consumer object informs the demuxing-controller that consumers no longer exist. The demuxing-controller drops all references to receiving_mboxes created by such a consumer. It may look like "deactivation" of these receiving_mboxes: there could be references to them, but no new messages will be delivered to such deactivated mboxes.
Because of that it's important to synchronize the lifetime of a consumer object with the lifetime of an agent that uses this consumer object. And the simplest way to make such synchronization is to hold a consumer object as a field of an agent:
class message_processor final : public so_5::agent_t
{
// Consumer instance.
so_5::extra::msg_hierarchy::consumer_t<basic_image> m_consumer;
...
public:
message_processor(
context_t ctx,
so_5::extra::msg_hierarchy::demuxer_t<basic_image> & demuxer,
...)
: so_5::agent_t{ std::move(ctx) }
// Create an instance of a consumer.
, m_consumer{ demuxer.allocate_consumer() }
, ...
{}
When such an agent is being deregistered all its subscription will be dropped. It means that the only owner of receiving_mboxes will be the demuxing-controller. But when the agent is physically destroyed then the corresponding consumer object is destroyed too, the destructor of the consumer object tells the demuxing-controller that receiving_mboxes are no longer needed and the demuxing-controller deactivates them. This way we avoid a memory leak.
The delivery of mutable messages is supported. It requires creation of a demuxer with single_consumer
type:
so_5::extra::msg_hierarchy::demuxer_t<basic_image> mpsc_demuxer{
env,
so_5::extra::msg_hierarchy::single_consumer};
Then a receiving_mbox for a mutable message has to be requested:
so_subscribe(m_consumer.receiving_mbox< so_5::mutable_msg<image_type_one> >())
.event([this](mutable_mhood_t<image_type_one> cmd) {...});
Then an instance of mutable message can be sent via the sending_mbox:
so_5::send< so_5::mutable_msg<image_type_one> >(mpsc_demuxer.sending_mbox());
However, there is one important nuance related to mutable messages: it's possible to make subscriptions that allow receiving a mutable message by several subscribers. For example:
class first_handler final : public so_5::agent_t {
so_5::extra::msg_hierarchy::consumer_t<basic_image> m_consumer;
...
void so_define_agent() override {
so_subscribe(
m_consumer.receiving_mbox< so_5::mutable_msg<basic_image> >())
.event([this](mutable_mhood_t<basic_image> cmd) {...});
...
}
};
class second_handler final : public so_5::agent_t {
so_5::extra::msg_hierarchy::consumer_t<basic_image> m_consumer;
...
void so_define_agent() override {
so_subscribe(
m_consumer.receiving_mbox< so_5::mutable_msg<image_type_one> >())
.event([this](mutable_mhood_t<image_type_one> cmd) {...});
...
}
};
At the subscription time it's unknown whether such subscriptions lead to problems or not. If there won't be messages of type image_type_one
(or any derived type), then there is no problem.
But if a message of type image_type_one
is sent then we have two subscribers for a single instance of a mutable message. This is prohibited.
Because of that the corresponding demuxing-controller checks the number of actual subscribers for a message at the send time. If there are more than one subscriber then the send will fail with an exception. Such an exception can be a big problem, for example, if a delayed message is being sent. So additional care has to be taken when working with mutable messages.
All types in a message hierarchy have to be ordinary messages. Signals are not supported at all. This is a principal moment and it can't be changed, because I need a message instance to call some methods from it to make message upcasting during delivery.
If we describe a message hierarchy like shown above and then create a demuxer object then only messages from this hierarchy can be delivered via the sending_mbox. For example:
struct basic_image : public so_5::extra::msg_hierarchy::root_t<basic_image> {...};
struct camera_vendor_1_image
: public basic_image
, public so_5::extra::msg_hierarchy::node_t<camera_vendor_1_image, basic_image>
{...};
struct camera_vendor_2_image
: public basic_image
, public so_5::extra::msg_hierarchy::node_t<camera_vendor_2_image, basic_image>
{...};
struct image_type_one
: public camera_vendor_1_image
, public so_5::extra::msg_hierarchy::node_t<image_type_one, camera_vendor_1_image>
{...};
...
struct config_changed final : public so_5::message_t {...};
...
so_5::extra::msg_hierarchy::demuxer_t<basic_image> demuxer{...};
...
// This is OK.
so_5::send<image_type_one>(demuxer.sending_mbox(), ...);
// This is NOT OK and will lead to an exception.
so_5::send<config_changed>(demuxer.sending_mbox(), ...);