diff --git a/.clang-tidy b/.clang-tidy index e9bd677c..02c9d6e4 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -2,6 +2,7 @@ Checks: -*, bugprone-*, clang-analyzer-*, + performance-*, -bugprone-easily-swappable-parameters, -clang-analyzer-security.insecureAPI.rand, -clang-analyzer-webkit*, diff --git a/include/broker/alm/routing_table.hh b/include/broker/alm/routing_table.hh index 618f4420..2f4137b4 100644 --- a/include/broker/alm/routing_table.hh +++ b/include/broker/alm/routing_table.hh @@ -144,7 +144,7 @@ void erase(routing_table& tbl, const endpoint_id& whom, impl(whom); while (!unreachable_peers.empty()) { // Our lambda modifies unreachable_peers, so we can't use iterators here. - endpoint_id peer = std::move(unreachable_peers.back()); + endpoint_id peer = unreachable_peers.back(); unreachable_peers.pop_back(); impl(peer); on_remove(peer); diff --git a/include/broker/configuration.hh b/include/broker/configuration.hh index 537188f3..437f52f9 100644 --- a/include/broker/configuration.hh +++ b/include/broker/configuration.hh @@ -111,7 +111,7 @@ public: configuration(); - configuration(configuration&&); + configuration(configuration&&) noexcept; /// Constructs a configuration with non-default Broker options. explicit configuration(broker_options opts); @@ -177,20 +177,20 @@ public: std::string_view description); template - std::enable_if_t> set(std::string key, T val) { + std::enable_if_t> set(std::string_view key, T val) { if constexpr (std::is_same_v) - set_bool(std::move(key), val); + set_bool(key, val); else if constexpr (std::is_signed_v) - set_i64(std::move(key), val); + set_i64(key, val); else - set_u64(std::move(key), val); + set_u64(key, val); } - void set(std::string key, timespan val); + void set(std::string_view key, timespan val); - void set(std::string key, std::string val); + void set(std::string_view key, std::string val); - void set(std::string key, std::vector val); + void set(std::string_view key, std::vector val); std::optional read_i64(std::string_view key, int64_t min_val, int64_t max_val) const; @@ -223,11 +223,11 @@ public: void init(int argc, char** argv); private: - void set_i64(std::string key, int64_t val); + void set_i64(std::string_view key, int64_t val); - void set_u64(std::string key, uint64_t val); + void set_u64(std::string_view key, uint64_t val); - void set_bool(std::string key, bool val); + void set_bool(std::string_view key, bool val); std::unique_ptr impl_; }; diff --git a/include/broker/endpoint.hh b/include/broker/endpoint.hh index 824f096e..14d2a34a 100644 --- a/include/broker/endpoint.hh +++ b/include/broker/endpoint.hh @@ -452,7 +452,7 @@ public: /// this function may block. /// @returns `true` if `what` was added before the timeout, `false` otherwise. [[nodiscard]] bool - await_filter_entry(topic what, + await_filter_entry(const topic& what, timespan timeout = defaults::await_peer_timeout); // -- worker management ------------------------------------------------------ @@ -514,9 +514,10 @@ protected: worker subscriber_; private: - worker do_subscribe(filter_type&& topics, detail::sink_driver_ptr driver); + worker do_subscribe(filter_type&& topics, + const detail::sink_driver_ptr& driver); - worker do_publish_all(detail::source_driver_ptr driver); + worker do_publish_all(const detail::source_driver_ptr& driver); template worker make_worker(F fn); diff --git a/include/broker/error.hh b/include/broker/error.hh index bae624fc..79cb9936 100644 --- a/include/broker/error.hh +++ b/include/broker/error.hh @@ -334,8 +334,7 @@ public: template static error make(ec_constant, endpoint_id node, std::string msg) { - return make_impl(Code, endpoint_info{std::move(node), std::nullopt}, - std::move(msg)); + return make_impl(Code, endpoint_info{node, std::nullopt}, std::move(msg)); } private: diff --git a/include/broker/internal/clone_actor.hh b/include/broker/internal/clone_actor.hh index 1eb52364..b9c42cee 100644 --- a/include/broker/internal/clone_actor.hh +++ b/include/broker/internal/clone_actor.hh @@ -71,7 +71,7 @@ public: error consume_nil(consumer_type* src); - void close(consumer_type* src, error); + void close(consumer_type* src, const error&); void send(consumer_type*, channel_type::cumulative_ack); diff --git a/include/broker/internal/core_actor.hh b/include/broker/internal/core_actor.hh index 50b812f4..6869e06f 100644 --- a/include/broker/internal/core_actor.hh +++ b/include/broker/internal/core_actor.hh @@ -190,7 +190,7 @@ public: /// `init_new_peer` with the buffers that connect to the worker. caf::error init_new_peer(endpoint_id peer, const network_info& addr, const filter_type& filter, - pending_connection_ptr conn); + const pending_connection_ptr& conn); /// Connects the input and output buffers for a new client to our central /// merge point. diff --git a/include/broker/internal/json_client.hh b/include/broker/internal/json_client.hh index 860bce92..fdb97e1e 100644 --- a/include/broker/internal/json_client.hh +++ b/include/broker/internal/json_client.hh @@ -60,7 +60,7 @@ public: static std::string_view default_serialization_failed_error(); - void init(filter_type filter, out_t out, + void init(const filter_type& filter, const out_t& out, caf::async::consumer_resource core_pull); }; diff --git a/include/broker/internal/master_actor.hh b/include/broker/internal/master_actor.hh index 3b3904b0..02453e76 100644 --- a/include/broker/internal/master_actor.hh +++ b/include/broker/internal/master_actor.hh @@ -85,7 +85,7 @@ public: error consume_nil(consumer_type* src); - void close(consumer_type* src, error); + void close(consumer_type* src, const error&); void send(consumer_type*, channel_type::cumulative_ack); diff --git a/include/broker/internal/store_actor.hh b/include/broker/internal/store_actor.hh index e683ea53..143a534b 100644 --- a/include/broker/internal/store_actor.hh +++ b/include/broker/internal/store_actor.hh @@ -103,7 +103,7 @@ public: [this](atom::increment, detail::shared_store_state_ptr ptr) { attached_states.emplace(std::move(ptr), size_t{0}).first->second += 1; }, - [this](atom::decrement, detail::shared_store_state_ptr ptr) { + [this](atom::decrement, const detail::shared_store_state_ptr& ptr) { auto& xs = attached_states; if (auto i = xs.find(ptr); i != xs.end()) if (--(i->second) == 0) @@ -169,7 +169,7 @@ public: // -- convenience functions -------------------------------------------------- /// Sends a delayed message by using the endpoint's clock. - void send_later(caf::actor hdl, timespan delay, caf::message msg); + void send_later(const caf::actor& hdl, timespan delay, caf::message msg); // -- member variables ------------------------------------------------------- diff --git a/include/broker/internal/web_socket.hh b/include/broker/internal/web_socket.hh index dbf71be5..7bcb6af1 100644 --- a/include/broker/internal/web_socket.hh +++ b/include/broker/internal/web_socket.hh @@ -18,8 +18,9 @@ using connect_event_t = std::pair; using on_connect_t = std::function; -expected launch(caf::actor_system& sys, openssl_options_ptr ssl_cfg, - std::string addr, uint16_t port, bool reuse_addr, +expected launch(caf::actor_system& sys, + const openssl_options_ptr& ssl_cfg, std::string addr, + uint16_t port, bool reuse_addr, const std::string& allowed_path, on_connect_t on_connect); diff --git a/include/broker/message.hh b/include/broker/message.hh index 95228bc5..092c96af 100644 --- a/include/broker/message.hh +++ b/include/broker/message.hh @@ -77,7 +77,7 @@ using packed_message = inline packed_message make_packed_message(packed_message_type type, uint16_t ttl, topic dst, std::vector bytes) { - return packed_message{type, ttl, dst, std::move(bytes)}; + return packed_message{type, ttl, std::move(dst), std::move(bytes)}; } /// @relates packed_message diff --git a/include/broker/status.hh b/include/broker/status.hh index b342dcae..8f4f3b4a 100644 --- a/include/broker/status.hh +++ b/include/broker/status.hh @@ -116,13 +116,13 @@ public: template static status make(endpoint_id node, std::string msg) { static_assert(sc_has_endpoint_info_v); - return {S, endpoint_info{std::move(node), std::nullopt}, std::move(msg)}; + return {S, endpoint_info{node, std::nullopt}, std::move(msg)}; } template static status make(sc_constant, endpoint_id node, std::string msg) { static_assert(sc_has_endpoint_info_v); - return {S, endpoint_info{std::move(node), std::nullopt}, std::move(msg)}; + return {S, endpoint_info{node, std::nullopt}, std::move(msg)}; } /// Default-constructs an unspecified status. diff --git a/include/broker/store.hh b/include/broker/store.hh index 0e39e74b..3a64523f 100644 --- a/include/broker/store.hh +++ b/include/broker/store.hh @@ -103,11 +103,11 @@ public: store() = default; - store(store&&) = default; + store(store&&) noexcept = default; store(const store&); - store& operator=(store&&); + store& operator=(store&&) noexcept; store& operator=(const store&); @@ -204,7 +204,7 @@ public: break; } - add(std::move(key), std::move(amount), init_type, std::move(expiry)); + add(std::move(key), std::move(amount), init_type, expiry); } /// Decrements a value by a given amount. This is supported for all @@ -213,7 +213,7 @@ public: /// @param value The amount to decrement the value. /// @param expiry An optional new expiration time for *key*. void decrement(data key, data amount, std::optional expiry = {}) { - subtract(std::move(key), std::move(amount), std::move(expiry)); + subtract(std::move(key), std::move(amount), expiry); } /// Appends a string to another one. @@ -221,7 +221,7 @@ public: /// @param str The string to append. /// @param expiry An optional new expiration time for *key*. void append(data key, data str, std::optional expiry = {}) { - add(std::move(key), std::move(str), data::type::string, std::move(expiry)); + add(std::move(key), std::move(str), data::type::string, expiry); } /// Inserts an index into a set. @@ -229,7 +229,7 @@ public: /// @param index The index to insert. /// @param expiry An optional new expiration time for *key*. void insert_into(data key, data index, std::optional expiry = {}) { - add(std::move(key), std::move(index), data::type::set, std::move(expiry)); + add(std::move(key), std::move(index), data::type::set, expiry); } /// Inserts an index into a table. @@ -241,7 +241,7 @@ public: void insert_into(data key, data index, data value, std::optional expiry = {}) { add(std::move(key), vector({std::move(index), std::move(value)}), - data::type::table, std::move(expiry)); + data::type::table, expiry); } /// Removes am index from a set or table. @@ -249,7 +249,7 @@ public: /// @param index The index to remove. /// @param expiry An optional new expiration time for *key*. void remove_from(data key, data index, std::optional expiry = {}) { - subtract(std::move(key), std::move(index), std::move(expiry)); + subtract(std::move(key), std::move(index), expiry); } /// Appends a value to a vector. @@ -257,15 +257,14 @@ public: /// @param value The value to append. /// @param expiry An optional new expiration time for *key*. void push(data key, data value, std::optional expiry = {}) { - add(std::move(key), std::move(value), data::type::vector, - std::move(expiry)); + add(std::move(key), std::move(value), data::type::vector, expiry); } /// Removes the last value of a vector. /// @param key The key of the vector from which to remove the last value. /// @param expiry An optional new expiration time for *key*. - void pop(data key, std::optional expiry = {}) { - subtract(key, key, std::move(expiry)); + void pop(const data& key, std::optional expiry = {}) { + subtract(key, key, expiry); } // --await-idle-start diff --git a/include/broker/store_event.hh b/include/broker/store_event.hh index 8460eb0e..c2da7695 100644 --- a/include/broker/store_event.hh +++ b/include/broker/store_event.hh @@ -75,7 +75,7 @@ public: entity_id publisher() const { if (auto value = to((*xs_)[5])) { - return {std::move(*value), get((*xs_)[6])}; + return {*value, get((*xs_)[6])}; } return {}; } diff --git a/include/broker/subnet.hh b/include/broker/subnet.hh index 0bda2530..41303dd4 100644 --- a/include/broker/subnet.hh +++ b/include/broker/subnet.hh @@ -19,7 +19,7 @@ public: subnet(); /// Construct subnet from an address and length. - subnet(address addr, uint8_t length); + subnet(const address& addr, uint8_t length); /// @return whether an address is contained within this subnet. bool contains(const address& addr) const; diff --git a/src/broker-node.cc b/src/broker-node.cc index a8ec9972..161283fa 100644 --- a/src/broker-node.cc +++ b/src/broker-node.cc @@ -295,7 +295,7 @@ void relay_mode(broker::endpoint& ep, topic_list topics) { } return true; }; - auto in = ep.make_subscriber(topics); + auto in = ep.make_subscriber(std::move(topics)); auto& cfg = broker::internal::endpoint_access{&ep}.cfg(); if (get_or(cfg, "verbose", false) && get_or(cfg, "rate", false)) { auto timeout = std::chrono::system_clock::now(); @@ -368,7 +368,7 @@ void ping_mode(broker::endpoint& ep, topic_list topics) { void pong_mode(broker::endpoint& ep, topic_list topics) { assert(topics.size() > 0); verbose::println("receive pings from topics ", topics); - auto in = ep.make_subscriber(topics); + auto in = ep.make_subscriber(std::move(topics)); for (;;) { auto x = in.get(); auto& val = get_data(x); diff --git a/src/broker-pipe.cc b/src/broker-pipe.cc index 9abebf55..526ef5e3 100644 --- a/src/broker-pipe.cc +++ b/src/broker-pipe.cc @@ -185,7 +185,7 @@ void subscribe_mode_stream(broker::endpoint& ep, const std::string& topic_str, // Init. [](size_t& msgs) { msgs = 0; }, // OnNext. - [cap](size_t& msgs, data_message x) { + [cap](size_t& msgs, const data_message& x) { ++msg_count; if (!rate) print_line(std::cout, to_string(x)); diff --git a/src/configuration.cc b/src/configuration.cc index 36b8eb46..7e3ccb85 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -206,7 +206,7 @@ configuration::configuration() : configuration(skip_init) { init(0, nullptr); } -configuration::configuration(configuration&& other) +configuration::configuration(configuration&& other) noexcept : impl_(std::move(other.impl_)) { // cannot '= default' this because impl is incomplete in the header. } @@ -443,28 +443,28 @@ void configuration::add_option(std::vector* dst, description); } -void configuration::set(std::string key, timespan val) { - impl_->set(std::move(key), val); +void configuration::set(std::string_view key, timespan val) { + impl_->set(key, val); } -void configuration::set(std::string key, std::string val) { - impl_->set(std::move(key), std::move(val)); +void configuration::set(std::string_view key, std::string val) { + impl_->set(key, std::move(val)); } -void configuration::set(std::string key, std::vector val) { - impl_->set(std::move(key), std::move(val)); +void configuration::set(std::string_view key, std::vector val) { + impl_->set(key, std::move(val)); } -void configuration::set_i64(std::string key, int64_t val) { - impl_->set(std::move(key), val); +void configuration::set_i64(std::string_view key, int64_t val) { + impl_->set(key, val); } -void configuration::set_u64(std::string key, uint64_t val) { - impl_->set(std::move(key), val); +void configuration::set_u64(std::string_view key, uint64_t val) { + impl_->set(key, val); } -void configuration::set_bool(std::string key, bool val) { - impl_->set(std::move(key), val); +void configuration::set_bool(std::string_view key, bool val) { + impl_->set(key, val); } std::optional configuration::read_i64(std::string_view key, diff --git a/src/detail/memory_backend.cc b/src/detail/memory_backend.cc index faf8a956..3ff6a5da 100644 --- a/src/detail/memory_backend.cc +++ b/src/detail/memory_backend.cc @@ -15,7 +15,7 @@ memory_backend::memory_backend(backend_options opts) expected memory_backend::put(const data& key, data value, std::optional expiry) { - store_[key] = {std::move(value), std::move(expiry)}; + store_[key] = {std::move(value), expiry}; return {}; } @@ -26,12 +26,12 @@ expected memory_backend::add(const data& key, const data& value, if (i == store_.end()) { if (init_type == data::type::none) return ec::type_clash; - auto newv = std::make_pair(data::from_type(init_type), expiry); - i = store_.emplace(std::move(key), std::move(newv)).first; + auto new_val = std::make_pair(data::from_type(init_type), expiry); + i = store_.emplace(key, std::move(new_val)).first; } auto result = visit(adder{value}, i->second.first); if (result) - i->second.second = std::move(expiry); + i->second.second = expiry; return result; } @@ -42,7 +42,7 @@ expected memory_backend::subtract(const data& key, const data& value, return ec::no_such_key; auto result = visit(remover{value}, i->second.first); if (result) - i->second.second = std::move(expiry); + i->second.second = expiry; return result; } diff --git a/src/detail/sqlite_backend.cc b/src/detail/sqlite_backend.cc index c12bd1ac..e5d875b7 100644 --- a/src/detail/sqlite_backend.cc +++ b/src/detail/sqlite_backend.cc @@ -464,7 +464,7 @@ expected sqlite_backend::expiries() const { auto expiry_count = sqlite3_column_int64(impl_->expiries, 1); auto duration = timespan(expiry_count); auto expiry = timestamp(duration); - auto e = expirable(std::move(*key), std::move(expiry)); + auto e = expirable(std::move(*key), expiry); rval.emplace_back(std::move(e)); } else { return {key.error()}; diff --git a/src/endpoint.cc b/src/endpoint.cc index eb95b41d..e0a4c3f3 100644 --- a/src/endpoint.cc +++ b/src/endpoint.cc @@ -656,7 +656,7 @@ uint16_t endpoint::listen(const std::string& address, uint16_t port, BROKER_DEBUG("cannot listen to" << address << "on port" << port << ":" << err); if (err_ptr) - *err_ptr = facade(std::move(err)); + *err_ptr = facade(err); }); return result; } @@ -839,7 +839,7 @@ using worker_actor = caf::stateful_actor; } // namespace worker endpoint::do_subscribe(filter_type&& filter, - detail::sink_driver_ptr sink) { + const detail::sink_driver_ptr& sink) { BROKER_ASSERT(sink != nullptr); using caf::async::make_spsc_buffer_resource; // Get a pair of connected resources. @@ -913,7 +913,8 @@ class data_message_source { } // namespace -worker endpoint::do_publish_all(std::shared_ptr driver) { +worker +endpoint::do_publish_all(const std::shared_ptr& driver) { BROKER_ASSERT(driver != nullptr); using caf::async::make_spsc_buffer_resource; // Get a pair of connected resources. @@ -933,7 +934,7 @@ worker endpoint::do_publish_all(std::shared_ptr driver) { caf::anon_send(native(core_), internal::data_consumer_res{std::move(con_res)}); // Store background worker and return. - workers_.emplace_back(facade(std::move(worker))); + workers_.emplace_back(facade(worker)); return workers_.back(); } @@ -1030,7 +1031,7 @@ void endpoint::await_peer(endpoint_id whom, std::function callback, return; } auto f = [whom, cb{std::move(callback)}](caf::event_based_actor* self, - caf::actor core, timespan t) { + const caf::actor& core, timespan t) { self->request(core, t, atom::await_v, whom) .then( [&]([[maybe_unused]] endpoint_id& discovered) { @@ -1042,7 +1043,7 @@ void endpoint::await_peer(endpoint_id whom, std::function callback, ctx_->sys.spawn(f, native(core_), timeout); } -bool endpoint::await_filter_entry(topic what, timespan timeout) { +bool endpoint::await_filter_entry(const topic& what, timespan timeout) { using namespace std::literals; BROKER_TRACE(BROKER_ARG(what) << BROKER_ARG(timeout)); auto abs_timeout = broker::now() + timeout; diff --git a/src/internal/clone_actor.cc b/src/internal/clone_actor.cc index 4a685261..3a2765bf 100644 --- a/src/internal/clone_actor.cc +++ b/src/internal/clone_actor.cc @@ -48,7 +48,7 @@ clone_state::clone_state(caf::event_based_actor* ptr, endpoint_id this_endpoint, caf::async::consumer_resource in_res, caf::async::producer_resource out_res) : input(this), max_sync_interval(master_timeout) { - super::init(ptr, move(this_endpoint), ep_clock, move(nm), move(parent), + super::init(ptr, this_endpoint, ep_clock, move(nm), move(parent), move(in_res), move(out_res)); master_topic = store_name / topic::master_suffix(); super::init(input); @@ -89,7 +89,7 @@ void clone_state::dispatch(const command_message& msg) { switch (tag) { case command_tag::action: { // Action messages from the master (broadcasted). - input.handle_event(seq, move(msg)); + input.handle_event(seq, msg); break; } case command_tag::producer_control: { @@ -117,7 +117,7 @@ void clone_state::dispatch(const command_message& msg) { BROKER_DEBUG("received ack_clone from" << cmd.sender); if (!master_id) master_id = cmd.sender; - set_store(move(inner.state)); + set_store(inner.state); start_output(); } else { BROKER_DEBUG("drop repeated ack_clone from" << cmd.sender); @@ -262,7 +262,8 @@ error clone_state::consume_nil(consumer_type* src) { return ec::broken_clone; } -void clone_state::close(consumer_type* src, [[maybe_unused]] error reason) { +void clone_state::close(consumer_type* src, + [[maybe_unused]] const error& reason) { BROKER_ERROR(BROKER_ARG(reason)); // TODO: send some 'bye, bye' message to enable the master to remove this // clone early, rather than waiting for timeout, see: @@ -384,7 +385,7 @@ void clone_state::set_store(std::unordered_map x) { // Short-circuit messages with an empty state. if (x.empty()) { if (!store.empty()) { - clear_command cmd{move(publisher)}; + clear_command cmd{publisher}; consume(cmd); } } else if (store.empty()) { diff --git a/src/internal/core_actor.cc b/src/internal/core_actor.cc index 779d48b0..178b8998 100644 --- a/src/internal/core_actor.cc +++ b/src/internal/core_actor.cc @@ -56,8 +56,8 @@ core_actor_state::core_actor_state(caf::event_based_actor* self, if (conn) { auto on_peering = [this](endpoint_id remote_id, const network_info& addr, const filter_type& filter, - pending_connection_ptr conn) { - std::ignore = init_new_peer(remote_id, addr, filter, std::move(conn)); + const pending_connection_ptr& conn) { + std::ignore = init_new_peer(remote_id, addr, filter, conn); }; auto on_peer_unavailable = [this](const network_info& addr) { peer_unavailable(addr); @@ -184,7 +184,8 @@ caf::behavior core_actor_state::make_behavior() { [this](atom::peer, endpoint_id peer, const network_info& addr, const filter_type& filter, node_consumer_res in_res, node_producer_res out_res) -> caf::result { - if (auto err = init_new_peer(peer, addr, filter, in_res, out_res)) + if (auto err = init_new_peer(peer, addr, filter, std::move(in_res), + std::move(out_res))) return err; else return caf::unit; @@ -242,7 +243,7 @@ caf::behavior core_actor_state::make_behavior() { dispatch(dst, pack(msg)); }, // -- interface for subscribers -------------------------------------------- - [this](atom::subscribe, filter_type filter) { + [this](atom::subscribe, const filter_type& filter) { // Subscribe to topics without actually caring about the events. This // makes sure that this node receives events on the topics, which in means // we can forward them. @@ -289,7 +290,8 @@ caf::behavior core_actor_state::make_behavior() { }, // -- interface for publishers --------------------------------------------- [this](data_consumer_res src) { - auto sub = data_inputs->add(self->make_observable().from_resource(src)); + auto sub = + data_inputs->add(self->make_observable().from_resource(std::move(src))); subscriptions.emplace_back(std::move(sub)); }, // -- data store management ------------------------------------------------ @@ -302,7 +304,7 @@ caf::behavior core_actor_state::make_behavior() { [this](atom::data_store, atom::master, atom::attach, const std::string& name, backend backend_type, backend_options opts) { - return attach_master(name, backend_type, opts); + return attach_master(name, backend_type, std::move(opts)); }, [this](atom::data_store, atom::master, atom::get, const std::string& name) -> caf::result { @@ -585,8 +587,8 @@ void core_actor_state::client_added(endpoint_id client_id, emit(endpoint_info{client_id, std::nullopt, type}, sc_constant(), "found a new client in the network"); - emit(endpoint_info{client_id, addr, std::move(type)}, - sc_constant(), "handshake successful"); + emit(endpoint_info{client_id, addr, type}, sc_constant(), + "handshake successful"); } void core_actor_state::client_removed(endpoint_id client_id, @@ -595,7 +597,7 @@ void core_actor_state::client_removed(endpoint_id client_id, BROKER_TRACE(BROKER_ARG(client_id) << BROKER_ARG(addr) << BROKER_ARG(type)); emit(endpoint_info{client_id, addr, type}, sc_constant(), "lost connection to client"); - emit(endpoint_info{client_id, std::nullopt, std::move(type)}, + emit(endpoint_info{client_id, std::nullopt, type}, sc_constant(), "lost the last path"); } @@ -611,9 +613,10 @@ void core_actor_state::try_connect(const network_info& addr, adapter->async_connect( addr, [this, rp](endpoint_id peer, const network_info& addr, - const filter_type& filter, pending_connection_ptr conn) mutable { + const filter_type& filter, + const pending_connection_ptr& conn) mutable { BROKER_TRACE(BROKER_ARG(peer) << BROKER_ARG(addr) << BROKER_ARG(filter)); - if (auto err = init_new_peer(peer, addr, filter, std::move(conn)); + if (auto err = init_new_peer(peer, addr, filter, conn); err && err != ec::repeated_peering_handshake_request) rp.deliver(std::move(err)); else @@ -649,7 +652,7 @@ void core_actor_state::try_connect(const network_info& addr, }, [this, rp, addr](const caf::error& what) mutable { BROKER_TRACE(BROKER_ARG(what)); - rp.deliver(std::move(what)); + rp.deliver(what); peer_unavailable(addr); }); } @@ -807,7 +810,7 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, BROKER_DEBUG("close output flow to" << peer_id); // }) // Emit values to the producer resource. - .subscribe(out_res); + .subscribe(std::move(out_res)); // Increase the logical time for this connection. This timestamp is crucial // for our do_finally handler, because this handler must do nothing if we have // already removed the connection manually, e.g., as a result of unpeering. @@ -815,7 +818,7 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, auto ts = (i == peers.end()) ? lamport_timestamp{} : ++i->second.ts; // Read messages from the peer. auto in = self->make_observable() - .from_resource(in_res) + .from_resource(std::move(in_res)) // If the peer closes this buffer, we assume a disconnect. .do_finally([this, peer_id, ts] { // BROKER_DEBUG("close input flow from" << peer_id); @@ -852,7 +855,7 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id, caf::error core_actor_state::init_new_peer(endpoint_id peer, const network_info& addr, const filter_type& filter, - pending_connection_ptr ptr) { + const pending_connection_ptr& ptr) { // Spin up a background worker that takes care of socket I/O. We communicate // to this worker via producer/consumer buffer resources. The [rd_1, wr_1] // pair is the direction core actor -> network. The other pair is for the @@ -902,11 +905,12 @@ caf::error core_actor_state::init_new_client(const network_info& addr, auto sub = central_merge ->as_observable() // Select by subscription. - .filter([this, filter, client_id](const node_message& msg) { + .filter([this, filt = std::move(filter), + client_id](const node_message& msg) { if (get_sender(msg) == client_id) return false; detail::prefix_matcher f; - return f(filter, get_topic(msg)); + return f(filt, get_topic(msg)); }) // Deserialize payload and wrap it into an data message. .flat_map_optional([this](const node_message& msg) { @@ -915,12 +919,12 @@ caf::error core_actor_state::init_new_client(const network_info& addr, return unpack(get_packed_message(msg)); }) // Emit values to the producer resource. - .subscribe(out_res); + .subscribe(std::move(out_res)); subscriptions.emplace_back(sub); } // Push messages received from the client into the central merge point. auto in = self->make_observable() - .from_resource(in_res) + .from_resource(std::move(in_res)) // If the client closes this buffer, we assume a disconnect. .do_finally([this, client_id, addr, type] { BROKER_DEBUG("client" << addr << "disconnected"); @@ -1066,7 +1070,7 @@ void core_actor_state::shutdown_stores() { // -- dispatching of messages to peers regardless of subscriptions ------------ void core_actor_state::dispatch(endpoint_id receiver, packed_message msg) { - central_merge->append_to_buf(make_node_message(id, receiver, msg)); + central_merge->append_to_buf(make_node_message(id, receiver, std::move(msg))); central_merge->try_push(); } diff --git a/src/internal/json_client.cc b/src/internal/json_client.cc index cc0fc8a5..6afc13ff 100644 --- a/src/internal/json_client.cc +++ b/src/internal/json_client.cc @@ -70,8 +70,7 @@ struct handshake_step { } else { initialized = true; // Ok, set up the actual pipeline and connect to the core. - state->init(std::move(filter), std::move(push_to_ws), - std::move(pull_from_core)); + state->init(filter, push_to_ws, std::move(pull_from_core)); return true; } } @@ -93,7 +92,10 @@ struct handshake_step { json_client_state::json_client_state(caf::event_based_actor* selfptr, endpoint_id this_node, caf::actor core_hdl, network_info ws_addr, in_t in, out_t out) - : self(selfptr), id(this_node), core(core_hdl), addr(ws_addr) { + : self(selfptr), + id(this_node), + core(std::move(core_hdl)), + addr(std::move(ws_addr)) { reader.mapper(&mapper); writer.mapper(&mapper); writer.skip_object_type_annotation(true); @@ -114,8 +116,8 @@ json_client_state::json_client_state(caf::event_based_actor* selfptr, // Read from the WebSocket, push to core (core_push). self // ->make_observable() - .from_resource(in) // Read all input text messages. - .transform(handshake_step{this, out, core_pull}) // Calls init(). + .from_resource(std::move(in)) // Read all input text messages. + .transform(handshake_step{this, std::move(out), core_pull}) // Calls init(). .do_finally([this] { ctrl_msgs->dispose(); }) // Parse all JSON coming in and forward them to the core. .flat_map_optional([this, n = 0](const caf::cow_string& str) mutable { @@ -206,7 +208,7 @@ bool inspect(Inspector& f, const_data_message_decorator& x) { } void json_client_state::init( - filter_type filter, out_t out, + const filter_type& filter, const out_t& out, caf::async::consumer_resource core_pull1) { using caf::async::make_spsc_buffer_resource; // Pull data from the core and forward as JSON. @@ -237,12 +239,12 @@ void json_client_state::init( .subscribe(out); subscriptions.push_back(std::move(sub)); caf::anon_send(core, atom::attach_client_v, addr, "web-socket"s, filter, - core_pull1, core_push2); + std::move(core_pull1), std::move(core_push2)); } else { auto sub = ctrl_msgs->as_observable().subscribe(out); subscriptions.push_back(std::move(sub)); caf::anon_send(core, atom::attach_client_v, addr, "web-socket"s, - filter_type{}, core_pull1, + filter_type{}, std::move(core_pull1), caf::async::producer_resource{}); } // Setup complete. Send ACK to the client. diff --git a/src/internal/master_actor.cc b/src/internal/master_actor.cc index 1d125d59..f3751ee7 100644 --- a/src/internal/master_actor.cc +++ b/src/internal/master_actor.cc @@ -49,8 +49,8 @@ master_state::master_state( caf::async::consumer_resource in_res, caf::async::producer_resource out_res) : output(this) { - super::init(ptr, std::move(this_endpoint), ep_clock, std::move(nm), - std::move(parent), std::move(in_res), std::move(out_res)); + super::init(ptr, this_endpoint, ep_clock, std::move(nm), std::move(parent), + std::move(in_res), std::move(out_res)); super::init(output); clones_topic = store_name / topic::clone_suffix(); backend = std::move(bp); @@ -77,7 +77,7 @@ void master_state::dispatch(const command_message& msg) { case command_tag::action: { // Action messages from writers. if (auto i = inputs.find(cmd.sender); i != inputs.end()) - i->second.handle_event(seq, std::move(msg)); + i->second.handle_event(seq, msg); else BROKER_DEBUG("received action from unknown sender:" << cmd.sender); break; @@ -150,7 +150,7 @@ void master_state::tick() { auto t = clock->now(); for (auto i = expirations.begin(); i != expirations.end();) { if (t > i->second) { - auto& key = i->first; + const auto& key = i->first; BROKER_INFO("EXPIRE" << key); if (auto result = backend->expire(key, t); !result) { BROKER_ERROR("EXPIRE" << key << "(FAILED)" @@ -158,7 +158,7 @@ void master_state::tick() { } else if (!*result) { BROKER_INFO("EXPIRE" << key << "(IGNORE/STALE)"); } else { - expire_command cmd{std::move(key), id}; + expire_command cmd{key, id}; emit_expire_event(cmd); broadcast(std::move(cmd)); } @@ -232,8 +232,8 @@ void master_state::consume(put_unique_command& x) { emit_insert_event(x); // Broadcast a regular "put" command (clones don't have to do their own // existence check) followed by the (positive) result message. - broadcast(put_command{std::move(x.key), std::move(x.value), x.expiry, - std::move(x.publisher)}); + broadcast( + put_command{std::move(x.key), std::move(x.value), x.expiry, x.publisher}); broadcast_result(true); } @@ -267,7 +267,7 @@ void master_state::consume(add_command& x) { // Broadcast a regular "put" command. Clones don't have to repeat the same // processing again. put_command cmd{std::move(x.key), std::move(*val), std::nullopt, - std::move(x.publisher)}; + x.publisher}; if (old_value) emit_update_event(cmd, *old_value); else @@ -300,7 +300,7 @@ void master_state::consume(subtract_command& x) { // Broadcast a regular "put" command. Clones don't have to repeat the same // processing again. put_command cmd{std::move(x.key), std::move(*val), std::nullopt, - std::move(x.publisher)}; + x.publisher}; emit_update_event(cmd, *old_value); broadcast(std::move(cmd)); } @@ -325,7 +325,7 @@ void master_state::consume(clear_command& x) { } if (auto res = backend->clear(); !res) detail::die("failed to clear master"); - broadcast(std::move(x)); + broadcast(x); } error master_state::consume_nil(consumer_type* src) { @@ -338,7 +338,7 @@ error master_state::consume_nil(consumer_type* src) { return {}; } -void master_state::close(consumer_type* src, [[maybe_unused]] error reason) { +void master_state::close(consumer_type* src, const error& reason) { BROKER_TRACE(BROKER_ARG(reason)); if (auto i = inputs.find(src->producer()); i != inputs.end()) { if (reason) @@ -511,17 +511,17 @@ caf::behavior master_state::make_behavior() { if (x) return caf::make_message(std::move(*x), id); else - return caf::make_message(native(std::move(x.error())), id); + return caf::make_message(native(x.error()), id); }, [this](atom::exists, const data& key) -> caf::result { auto x = backend->exists(key); BROKER_INFO("EXISTS" << key << "->" << x); - return {data{std::move(*x)}}; + return {data{*x}}; }, [this](atom::exists, const data& key, request_id id) { auto x = backend->exists(key); BROKER_INFO("EXISTS" << key << "with id:" << id << "->" << x); - return caf::make_message(data{std::move(*x)}, id); + return caf::make_message(data{*x}, id); }, [this](atom::get, const data& key) -> caf::result { auto x = backend->get(key); diff --git a/src/internal/prometheus.cc b/src/internal/prometheus.cc index e87bd503..deec7781 100644 --- a/src/internal/prometheus.cc +++ b/src/internal/prometheus.cc @@ -135,7 +135,7 @@ caf::behavior prometheus_actor::make_behavior() { if (num_connections() + num_doormen() == 0) quit(); }, - [this](data_message msg) { + [this](const data_message& msg) { BROKER_TRACE(BROKER_ARG(msg)); collector_.insert_or_update(get_data(msg)); }, diff --git a/src/internal/store_actor.cc b/src/internal/store_actor.cc index f6f6fc67..28a8d29f 100644 --- a/src/internal/store_actor.cc +++ b/src/internal/store_actor.cc @@ -80,12 +80,12 @@ void store_actor_state::init(caf::event_based_actor* selfptr, defaults::store::tick_interval); self // ->make_observable() - .from_resource(in_res) + .from_resource(std::move(in_res)) .for_each([this](const command_message& msg) { dispatch(msg); }, [this](const caf::error& what) { self->quit(what); }, [this] { self->quit(); }); out.emplace(self); - out->as_observable().subscribe(out_res); + out->as_observable().subscribe(std::move(out_res)); } // -- event signaling ---------------------------------------------------------- @@ -147,7 +147,7 @@ void store_actor_state::on_down_msg(const caf::actor_addr& source, // -- convenience functions ---------------------------------------------------- -void store_actor_state::send_later(caf::actor hdl, timespan delay, +void store_actor_state::send_later(const caf::actor& hdl, timespan delay, caf::message msg) { clock->send_later(facade(hdl), tick_interval, &msg); } diff --git a/src/internal/web_socket.cc b/src/internal/web_socket.cc index 44bebc76..d49398c5 100644 --- a/src/internal/web_socket.cc +++ b/src/internal/web_socket.cc @@ -109,8 +109,9 @@ void ssl_accept(caf::net::multiplexer& mpx, Socket fd, mpx.init(ptr); } -expected launch(caf::actor_system& sys, openssl_options_ptr ssl_cfg, - std::string addr, uint16_t port, bool reuse_addr, +expected launch(caf::actor_system& sys, + const openssl_options_ptr& ssl_cfg, std::string addr, + uint16_t port, bool reuse_addr, const std::string& allowed_path, on_connect_t on_connect) { using namespace std::literals; diff --git a/src/store.cc b/src/store.cc index e886a95d..b87272fe 100644 --- a/src/store.cc +++ b/src/store.cc @@ -194,7 +194,7 @@ store::store(endpoint_id this_peer, worker frontend, std::string name) { caf::anon_send(hdl, atom::increment_v, std::move(ptr)); } -store& store::operator=(store&& other) { +store& store::operator=(store&& other) noexcept { with_state_ptr([this](detail::shared_store_state_ptr& st) { auto frontend = dref(st).frontend; caf::anon_send(frontend, atom::decrement_v, std::move(st)); @@ -463,7 +463,7 @@ void store::await_idle(std::function callback, timespan timeout) { with_state_or( [&](state_impl& st) { auto await_actor = [cb{std::move(callback)}](caf::event_based_actor* self, - caf::actor frontend, + const caf::actor& frontend, timespan t) { self->request(frontend, t, atom::await_v, atom::idle_v) .then([cb](atom::ok) { cb(true); }, diff --git a/src/subnet.cc b/src/subnet.cc index 59742449..9665d6dd 100644 --- a/src/subnet.cc +++ b/src/subnet.cc @@ -13,8 +13,7 @@ namespace broker { subnet::subnet() : len_(0) {} -subnet::subnet(address addr, uint8_t length) - : net_(std::move(addr)), len_(length) { +subnet::subnet(const address& addr, uint8_t length) : net_(addr), len_(length) { if (init()) return; net_ = {}; diff --git a/src/telemetry/metric_registry.cc b/src/telemetry/metric_registry.cc index 38748eb2..3a44656d 100644 --- a/src/telemetry/metric_registry.cc +++ b/src/telemetry/metric_registry.cc @@ -260,7 +260,7 @@ class post_init_impl : public impl_base { using super = impl_base; post_init_impl(internal::endpoint_context_ptr ctx) - : super(std::addressof(ctx->sys.metrics())), ctx_(ctx) { + : super(std::addressof(ctx->sys.metrics())), ctx_(std::move(ctx)) { // nop }