-
Notifications
You must be signed in to change notification settings - Fork 5
so5extra 1.4 Broadcasting fixed_mbox
Sometimes it is necessary to broadcast a message between several already existed mboxes. For example, there can be several agents each of them is subscribed to some message from the direct mbox:
class worker_t final : public so_5::agent_t
{
public :
struct start final : public so_5::signal_t {};
struct stop final : public so_5::signal_t {};
worker_t( context_t ctx, std::string name )
: so_5::agent_t{ std::move(ctx) }
, m_name{ std::move(name) }
{}
void so_define_agent() override
{
// Make subscriptions for messages from the direct mbox.
so_subscribe_self()
.event( &worker_t::on_start )
.event( &worker_t::on_stop );
}
private :
const std::string m_name;
void on_start( mhood_t<start> ) {...}
void on_stop( mhood_t<stop> ) {...}
};
We have to send a worker_t::start
message to all of them. The trivial way is to call so_5::send
in a loop:
std::vector<so_5::mbox_t> worker_mboxes = ...;
for(const auto & m : worker_mboxes)
so_5::send<worker_t::start>(m);
But in that case, we have to hold a container with worker's mboxes and have to repeat that loop for every message we want to send.
That approach can be totally inappropriate if we have to provide a single mbox to some agent that will send start
and stop
messages:
// Agent-manager. Thinks that there is just one worker behind a mbox.
class manager_t final : public so_5::agent_t
{
const so_5::mbox_t m_worker;
...
void on_some_event(mhood_t<some_event>)
{
so_5::send<worker_t::start>(m_worker);
}
...
public :
manager_t(context_t ctx, so_5::mbox_t worker)
: so_5::agent_t{std::move(ctx)}
, m_worker{std::move(worker)}
{...}
...
};
The solution is to use a special mbox that will automatically broadcast a new message to all mboxes behind that special mbox. It means that the loop with so_5::send
calls will be hidden inside the implementation of that special mbox.
so5extra provides a simple implementation of such broadcasting mbox: it is implemented as so_5::extra::mboxes::broadcast::fixed_mbox_template_t
template class.
#include <so_5_extra/mboxes/broadcast.hpp>
#include <so_5/all.hpp>
...
// Step 1: the creation of worker agents and collecting of their mboxes.
std::vector<so_5::mbox_t> worker_mboxes;
for(int i = 0; i != worker_count; ++i)
{
auto worker = env.make_agent<worker_t>(generate_worker_name(i));
worker_mboxes.push_back(worker->so_direct_mbox());
env.register_agent_as_coop(std::move(worker));
}
// Step 2: the creation of broadcasting mbox.
auto broadcasting_mbox = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>::make(
env, std::move(worker_mboxes));
// Step 3: the creation of manager.
env.register_agent_as_coop(env.make_agent<manager_t>(std::move(broadcasting_mbox)));
Since v.1.3.1 there is so_5::extra::mboxes::broadcast::fixed_mbox_template_t
template class that implements so_5::abstract_message_box_t
interface. It means that an instance of fixed_mbox_template_t
class can be used as mbox (with several limitations discussed below).
To add the definition of so_5::extra::mboxes::broadcast::fixed_mbox_template_t
to your code it is necessary to include the following header file:
#include <so_5_extra/mboxes/broadcast.hpp>
Note. The standard SObjectizer header file so_5/all.hpp
should be included separately:
#include <so_5_extra/mboxes/broadcast.hpp>
...
#include <so_5/all.hpp>
An instance of fixed_mbox_template_t
receives a set of destination mboxes in the constructor. This set is stored inside fixed_mbox_template_t
instance and can't be changed later. It means that an instance of fixed_mbox_template_t
will deliver every message to the same set of destinations and this set is not changed.
The delivery process of a new message is stopped if an attempt to deliver the message to i-th destination leads to an exception. It means that if an instance of fixed_mbox_template_t
has 5 destinations and an exception is thrown at attempt to deliver the message to 3rd destination then 4th and 5th destinations will not received that message.
The fixed_mbox_template_t
doesn't intercept exceptions thrown. It means that exception will go out from do_deliver_message
method.
The fixed_mbox_template_t
class has the following public interface (methods inherited from abstract_message_mbox_t
are not shown):
template<typename Container=std::vector<so_5::mbox_t>>
class fixed_mbox_template_t
{
public:
// (1)
static so_5::mbox_t make(
so_5::environment_t & env,
const Container & destinations);
// (2)
static so_5::mbox_t make(
so_5::environment_t & env,
Container && destinations);
// (3)
template<typename Input_Iterator>
static so_5::mbox_t make(
so_5::environment_t & env,
Input_Iterator first, Input_Iterator last);
// (4)
template<typename Another_Container>
static so_5::mbox_t make(
so_5::environment_t & env,
const Another_Container & destinations);
};
It means that the creation of a new instance of fixed_mbox_template_t
is possible only via one of make
methods.
The make
method number (1) can be used when there is an immutable list of destinations and the content of that list can't be borrowed. For example, if a destination list should be used for the creation of several instances of broadcasting mboxes:
using broadcasting_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>;
auto make_workers() {
std::vector<so_5::mbox_t> worker_mboxes;
... // The creation of workers.
return worker_mboxes;
}
...
const auto destinations = make_workers();
const auto first_mbox = broadcasting_mbox_t::make(env, destinations);
const auto second_mbox = broadcasting_mbox_t::make(env, destinations);
...
The make
method number (2) can be used if the content of a list of destinations can be borrowed (moved into a mbox instance):
using broadcasting_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>;
auto make_workers() {
std::vector<so_5::mbox_t> worker_mboxes;
... // The creation of workers.
return worker_mboxes;
}
...
// A call to make_workers returns a temporary object the content of that
// can be safely borrowed.
const auto broadcaster = broadcasting_mbox_t::make(env, make_workers());
The make
method number (3) can be used if there is a pair of iterators in the form [first, last):
using broadcasting_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>;
so_5::mbox_t create_worker() {
auto worker = env.make_agent<worker_t>(...);
auto result = worker->so_direct_mbox();
env.register_agent_as_coop(std::move(worker));
}
...
so_5::mbox_t worker_mboxes[] = {
create_worker(),
create_worker(),
create_worker(),
create_worker()
};
const auto first_broadcaster = broadcasting_mbox_t::make(env, worker_mboxes, worker_mboxes+2);
const auto second_broadcaster = broascasting_mbox_t::make(env, worker_mboxes+2, worker_mboxes+4);
The make
method number (4) can be used if there is a container of different type (or some range-like object) with destination mboxes. For example:
using broadcasting_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<>;
so_5::mbox_t create_worker() {
auto worker = env.make_agent<worker_t>(...);
auto result = worker->so_direct_mbox();
env.register_agent_as_coop(std::move(worker));
}
...
std::array<so_5::mbox_t, 5> worker_mboxes{
create_worker(),
create_worker(),
create_worker(),
create_worker(),
create_worker()
};
const auto broadcaster = broadcasting_mbox_t::make(env, worker_mboxes);
By default fixed_mbox_template_t
is parametrized by std::vector<mbox_t>
type. But a user can specify any different container that allows sequential iteration thru elements.
The main idea behind is a possibility to borrow the content of destinations container during the construction of fixed_mbox_template_t
instance. For example, let's assume that you collect a set of destination mboxes into a std::list
. If that list it not needed anymore it is possible to borrow its content into fixed_mbox_template_t
object:
auto collect_destinations() {
std::list<so_5::mbox_t> destinations;
... // Filling the `destinations` container.
return destinations;
}
...
using broadcasting_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<std::list<so_5::mbox_t>>;
auto broadcaster = broadcasting_mbox_t::make(env, collect_destinations());
Another case: the count of destinations is known at the compile time. For example:
std::array<so_5::mbox_t, 20> destinations;
destinations[0] = ...;
destinations[1] = ...;
...
using broadcasting_mbox_t = so_5::extra::mboxes::broadcast::fixed_mbox_template_t<std::array<so_5::mbox_t, 20>>;
auto broadcaster = broadcasting_mbox_t::make(env, destinations);
The fixed_mbox_template_t
doesn't support subscriptions. An attempt to subscribe to a message from a broadcasting mbox will lead to an exception.
The fixed_mbox_template_t
doesn't support delivery filters. An attempt to set a delivery filter for a broadcasting mbox will lead to an exception.
The fixed_mbox_template_t
is multi-producer/multi-consumer mbox. Because of that, the delivery of mutable messages is prohibited. An attempt to send a mutable message will lead to an exception.