forked from qdrvm/kagome
-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscription_engine.hpp
121 lines (100 loc) · 4.12 KB
/
subscription_engine.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#ifndef KAGOME_SUBSCRIPTION_ENGINE_HPP
#define KAGOME_SUBSCRIPTION_ENGINE_HPP
#include <list>
#include <memory>
#include <shared_mutex>
#include <unordered_map>
namespace kagome::subscription {
template <typename Event, typename Receiver, typename... Arguments>
class Subscriber;
using SubscriptionSetId = uint32_t;
/**
* @tparam EventKey - the type of a specific event from event set (e. g. a key
* from a storage or a particular kind of event from an enumeration)
* @tparam Receiver - the type of an object that is a part of a Subscriber
* internal state and can be accessed on every event
* @tparam EventParams - set of types of values passed on each event
* notification
*/
template <typename EventKey, typename Receiver, typename... EventParams>
class SubscriptionEngine final
: public std::enable_shared_from_this<
SubscriptionEngine<EventKey, Receiver, EventParams...>> {
public:
using EventKeyType = EventKey;
using ReceiverType = Receiver;
using SubscriberType =
Subscriber<EventKeyType, ReceiverType, EventParams...>;
using SubscriberWeakPtr = std::weak_ptr<SubscriberType>;
/// List is preferable here because this container iterators remain
/// alive after removal from the middle of the container
/// TODO(iceseer): PRE-476 remove processor cache penalty, while iterating,
/// using custom allocator
using SubscribersContainer =
std::list<std::pair<SubscriptionSetId, SubscriberWeakPtr>>;
using IteratorType = typename SubscribersContainer::iterator;
public:
SubscriptionEngine() = default;
~SubscriptionEngine() = default;
SubscriptionEngine(SubscriptionEngine &&) = default; // NOLINT
SubscriptionEngine &operator=(SubscriptionEngine &&) = default; // NOLINT
SubscriptionEngine(const SubscriptionEngine &) = delete;
SubscriptionEngine &operator=(const SubscriptionEngine &) = delete;
private:
template <typename KeyType, typename ValueType, typename... Args>
friend class Subscriber;
using KeyValueContainer =
std::unordered_map<EventKeyType, SubscribersContainer>;
mutable std::shared_mutex subscribers_map_cs_;
KeyValueContainer subscribers_map_;
IteratorType subscribe(SubscriptionSetId set_id,
const EventKeyType &key,
SubscriberWeakPtr ptr) {
std::unique_lock lock(subscribers_map_cs_);
auto &subscribers_list = subscribers_map_[key];
return subscribers_list.emplace(subscribers_list.end(),
std::make_pair(set_id, std::move(ptr)));
}
void unsubscribe(const EventKeyType &key, const IteratorType &it_remove) {
std::unique_lock lock(subscribers_map_cs_);
auto it = subscribers_map_.find(key);
if (subscribers_map_.end() != it) {
it->second.erase(it_remove);
if (it->second.empty()) subscribers_map_.erase(it);
}
}
public:
size_t size(const EventKeyType &key) const {
std::shared_lock lock(subscribers_map_cs_);
if (auto it = subscribers_map_.find(key); it != subscribers_map_.end())
return it->second.size();
return 0ull;
}
size_t size() const {
std::shared_lock lock(subscribers_map_cs_);
size_t count = 0ull;
for (auto &it : subscribers_map_) count += it.second.size();
return count;
}
void notify(const EventKeyType &key, const EventParams &...args) {
std::shared_lock lock(subscribers_map_cs_);
auto it = subscribers_map_.find(key);
if (subscribers_map_.end() == it) return;
auto &subscribers_container = it->second;
for (auto it_sub = subscribers_container.begin();
it_sub != subscribers_container.end();) {
if (auto sub = it_sub->second.lock()) {
sub->on_notify(it_sub->first, key, args...);
++it_sub;
} else {
it_sub = subscribers_container.erase(it_sub);
}
}
}
};
} // namespace kagome::subscription
#endif // KAGOME_SUBSCRIPTION_ENGINE_HPP