Skip to content

Commit

Permalink
feature: offchain disabled validators
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>
  • Loading branch information
xDimon committed Oct 7, 2024
1 parent 6df4d6e commit 6e46905
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 62 deletions.
12 changes: 5 additions & 7 deletions core/dispute_coordinator/impl/candidate_vote_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
*/

#include "dispute_coordinator/impl/candidate_vote_state.hpp"

#include <set>

namespace kagome::dispute {
CandidateVoteState CandidateVoteState::create(
CandidateVotes votes,
CandidateEnvironment &env,
std::vector<ValidatorIndex> &disabled_validators,
Timestamp now) {
CandidateVoteState CandidateVoteState::create(CandidateVotes votes,
CandidateEnvironment &env,
Timestamp now) {
CandidateVoteState res{.votes = std::move(votes),
.own_vote = CannotVote{},
.dispute_status = std::nullopt};
Expand Down Expand Up @@ -54,8 +53,7 @@ namespace kagome::dispute {

auto has_vote_of_active = [&](auto &votes) {
auto is_not_disabled = [&](auto &vote) {
return not std::binary_search(
disabled_validators.begin(), disabled_validators.end(), vote.first);
return not std::ranges::binary_search(env.disabled_indices, vote.first);
};
return std::ranges::find_if(votes.begin(), votes.end(), is_not_disabled)
!= votes.end();
Expand Down
1 change: 0 additions & 1 deletion core/dispute_coordinator/impl/candidate_vote_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace kagome::dispute {
*/
static CandidateVoteState create(CandidateVotes votes,
CandidateEnvironment &env,
std::vector<ValidatorIndex> &disabled,
Timestamp now);

/// Votes already existing for the candidate + receipt.
Expand Down
142 changes: 91 additions & 51 deletions core/dispute_coordinator/impl/dispute_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,11 @@ namespace kagome::dispute {

// https://github.com/paritytech/polkadot/blob/40974fb99c86f5c341105b7db53c7aa0df707d66/node/core/dispute-coordinator/src/lib.rs#L298
for (auto &[session, candidate_hash, status] : active_disputes) {
auto env_opt =
makeCandidateEnvironment(*session_keys_, session, first_leaf.hash);
auto env_opt = makeCandidateEnvironment(
*session_keys_,
session,
first_leaf.hash,
offchain_disabled_validators_.asVector(session));
if (not env_opt.has_value()) {
continue;
}
Expand All @@ -348,20 +351,8 @@ namespace kagome::dispute {
}
auto &candidate_votes = votes_res.value().value();

auto &relay_parent =
candidate_votes.candidate_receipt.descriptor.relay_parent;

auto disabled_validators_res = api_->disabled_validators(relay_parent);
if (disabled_validators_res.has_error()) {
SL_WARN(log_,
"Cannot import votes, without getting disabled validators: {}",
disabled_validators_res.error());
continue;
}
auto &disabled_validators = disabled_validators_res.value();

auto vote_state = CandidateVoteState::create(
candidate_votes, env, disabled_validators, system_clock_.nowUint64());
candidate_votes, env, system_clock_.nowUint64());

auto is_included = scraper_->is_candidate_included(candidate_hash);
auto is_backed = scraper_->is_candidate_backed(candidate_hash);
Expand Down Expand Up @@ -799,8 +790,13 @@ namespace kagome::dispute {
// If the latest session was updated, then prune spam slots
if (highest_session_ < session_index) {
highest_session_ = session_index;
static size_t kWindowSize = 6;
spam_slots_->prune_old(highest_session_ - kWindowSize);

auto prune_up_to = (highest_session_ > kDisputeWindow)
? highest_session_ - (kDisputeWindow - 1)
: 1;

spam_slots_->prune_old(prune_up_to);
offchain_disabled_validators_.prune_old(prune_up_to);
}

// The `runtime-api` subsystem has an internal queue which serializes the
Expand Down Expand Up @@ -943,10 +939,10 @@ namespace kagome::dispute {
sending_dispute->refresh_sends(*runtime_info_, active_sessions_);
//.await?;

// Only rate limit if we actually sent something out _and_ it was not just
// Only rate limit if we actually sent something out _and_ it wasn't just
// because of errors on previous sends.
//
// Reasoning: It would not be acceptable to slow down the whole subsystem,

// Reasoning: It wouldn't be acceptable to slow down the whole subsystem,
// just because of a few bad peers having problems. It is actually better
// to risk running into their rate limit in that case and accept a minor
// reputation change.
Expand Down Expand Up @@ -979,7 +975,17 @@ namespace kagome::dispute {
DisputeCoordinatorImpl::makeCandidateEnvironment(
crypto::SessionKeys &session_keys,
SessionIndex session,
primitives::BlockHash relay_parent) {
primitives::BlockHash relay_parent,
std::vector<ValidatorIndex> &&disabled_offchain) {
auto disabled_onchain_res = api_->disabled_validators(relay_parent);
if (disabled_onchain_res.has_error()) {
SL_WARN(log_,
"Failed to get disabled validators: {}",
disabled_onchain_res.error());
return std::nullopt;
}
auto &disabled_onchain = disabled_onchain_res.value();

auto session_info_opt_res = api_->session_info(relay_parent, session);
if (session_info_opt_res.has_error()) {
SL_WARN(log_,
Expand All @@ -999,9 +1005,29 @@ namespace kagome::dispute {
controlled_indices.emplace(keypair->second);
}

return CandidateEnvironment{.session_index = session,
.session = std::move(session_info),
.controlled_indices = controlled_indices};
// combine on-chain with off-chain disabled validators
// process disabled validators in the following order:
// - on-chain disabled validators
// - prioritized order of off-chain disabled validators
// deduplicate the list and take at most `byzantine_threshold` validators
auto byzantine_threshold =
(std::max<size_t>(session_info.validators.size(), 1) - 1) / 3;
std::unordered_set<ValidatorIndex> disabled_indices;
for (auto &container : {disabled_onchain, disabled_offchain}) {
for (auto validator_index : container) {
disabled_indices.emplace(validator_index);
if (disabled_indices.size() >= byzantine_threshold) {
break;
}
}
}

return CandidateEnvironment{
.session_index = session,
.session = std::move(session_info),
.controlled_indices = std::move(controlled_indices),
.disabled_indices = std::move(disabled_indices),
};
}

outcome::result<bool> DisputeCoordinatorImpl::handle_import_statements(
Expand All @@ -1028,7 +1054,6 @@ namespace kagome::dispute {
// blocks, and hence we do not have a `CandidateReceipt` available.

CandidateVoteState old_state;
std::vector<ValidatorIndex> disabled_validators;

OUTCOME_TRY(old_state_opt,
storage_->load_candidate_votes(session, candidate_hash));
Expand All @@ -1055,8 +1080,11 @@ namespace kagome::dispute {
relay_parent = old_state_opt->candidate_receipt.descriptor.relay_parent;
}

auto env_opt =
makeCandidateEnvironment(*session_keys_, session, relay_parent);
auto env_opt = makeCandidateEnvironment(
*session_keys_,
session,
relay_parent,
offchain_disabled_validators_.asVector(session));
if (not env_opt.has_value()) {
SL_DEBUG(log_,
"We are lacking a `SessionInfo` for handling import of "
Expand All @@ -1065,24 +1093,8 @@ namespace kagome::dispute {
}
auto &env = env_opt.value();

auto disabled_validators_res = api_->disabled_validators(relay_parent);
if (disabled_validators_res.has_error()) {
SL_WARN(log_,
"Cannot import votes, without getting disabled validators: {}",
disabled_validators_res.error());
return outcome::success(false);
}
disabled_validators = std::move(disabled_validators_res.value());

auto is_disabled = [&disabled_validators =
disabled_validators_res.value()](auto index) {
return std::binary_search(
disabled_validators.begin(), disabled_validators.end(), index);
};

if (old_state_opt.has_value()) {
old_state = CandidateVoteState::create(
old_state_opt.value(), env, disabled_validators, now);
old_state = CandidateVoteState::create(old_state_opt.value(), env, now);
}

SL_TRACE(log_, "Loaded votes");
Expand Down Expand Up @@ -1133,10 +1145,10 @@ namespace kagome::dispute {
continue;
}

auto is_disabled_validator = is_disabled(val_index);
auto is_disabled_validator = env.disabled_indices.contains(val_index);

// Postpone votes of disabled validators while any votes for candidate are
// not exist
// Postpone votes of disabled validators while any votes for candidate
// don't exist
if (is_disabled_validator and votes.valid.empty()
and votes.invalid.empty()) {
postponed_statements.emplace_back(std::move(vote));
Expand Down Expand Up @@ -1176,8 +1188,7 @@ namespace kagome::dispute {

ImportResult intermediate_result{
.old_state = std::move(old_state),
.new_state = CandidateVoteState::create(
votes, env, disabled_validators, now), // new_state
.new_state = CandidateVoteState::create(votes, env, now),
.imported_invalid_votes = imported_invalid_votes,
.imported_valid_votes = imported_valid_votes,
.imported_approval_votes = 0,
Expand Down Expand Up @@ -1335,8 +1346,8 @@ namespace kagome::dispute {
}
}

import_result.new_state = CandidateVoteState::create(
std::move(_votes), env, disabled_validators, now);
import_result.new_state =
CandidateVoteState::create(std::move(_votes), env, now);
}
} else {
SL_TRACE(log_, "Not requested approval signatures");
Expand Down Expand Up @@ -1514,6 +1525,20 @@ namespace kagome::dispute {
}
}

if (is_freshly_concluded_for) {
for (const auto &[validator_index, vote] : new_state.votes.invalid) {
SL_DEBUG(
log_,
"Disabled offchain for voting invalid against a valid candidate."
"(candidate={}, validator_index={})",
candidate_hash,
validator_index);
// https://github.com/paritytech/polkadot-sdk/blob/b16237ad6f019667a59b0e3e726f6ac20e2d0a1c/polkadot/node/core/dispute-coordinator/src/initialized.rs#L1381
offchain_disabled_validators_.insert_against_valid(session,
validator_index);
}
}

// Notify ChainSelection if a dispute has concluded against a candidate.
// ChainSelection will need to mark the candidate's relay parent as
// reverted.
Expand All @@ -1539,6 +1564,21 @@ namespace kagome::dispute {
"Could not find an including block for candidate against "
"which a dispute has concluded");
}

for (const auto &[validator_index, vote] : new_state.votes.valid) {
const auto &[kind, sig] = vote;
bool is_backer =
is_type<BackingValid>(kind) or is_type<BackingSeconded>(kind);
SL_DEBUG(log_,
"Disabled offchain for voting valid for an invalid candidate."
"(candidate={}, validator_index={}, is_backer={})",
candidate_hash,
validator_index,
is_backer);
// https://github.com/paritytech/polkadot-sdk/blob/b16237ad6f019667a59b0e3e726f6ac20e2d0a1c/polkadot/node/core/dispute-coordinator/src/initialized.rs#L1412
offchain_disabled_validators_.insert_for_invalid(
session, validator_index, is_backer);
}
}

SL_TRACE(log_,
Expand Down
76 changes: 73 additions & 3 deletions core/dispute_coordinator/impl/dispute_coordinator_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
#include "network/dispute_request_observer.hpp"
#include "network/types/dispute_messages.hpp"

#include <list>

#include <libp2p/basic/scheduler.hpp>

#include "clock/impl/basic_waitable_timer.hpp"
Expand Down Expand Up @@ -104,6 +102,12 @@ namespace kagome::dispute {
/// peer to enforce that limit.
static constexpr auto kReceiveRateLimit = std::chrono::milliseconds(100);

/// It would be nice to draw this from the chain state, but we have no tools
/// for it right now. On Polkadot this is 1 day, and on Kusama it's 6 hours.
///
/// Number of sessions we want to consider in disputes.
static constexpr auto kDisputeWindow = SessionIndex(6);

DisputeCoordinatorImpl(
std::shared_ptr<application::ChainSpec> chain_spec,
std::shared_ptr<application::AppStateManager> app_state_manager,
Expand Down Expand Up @@ -194,7 +198,8 @@ namespace kagome::dispute {
std::optional<CandidateEnvironment> makeCandidateEnvironment(
crypto::SessionKeys &session_keys,
SessionIndex session,
primitives::BlockHash relay_parent);
primitives::BlockHash relay_parent,
std::vector<ValidatorIndex> &&offchain_disabled);

outcome::result<void> process_on_chain_votes(
const ScrapedOnChainVotes &votes);
Expand Down Expand Up @@ -368,6 +373,71 @@ namespace kagome::dispute {
std::list<std::tuple<CandidateHash, std::shared_ptr<SendingDispute>>>
sending_disputes_;

// Ideally, we want to use the top `byzantine_threshold` offenders here
// based on the amount of stake slashed. However, given that slashing might
// be applied with a delay, we want to have some list of offenders as soon
// as disputes conclude offchain. This list only approximates the top
// offenders and only accounts for lost disputes. But that should be good
// enough to prevent spam attacks.
struct LostSessionDisputes {
std::deque<ValidatorIndex> backers_for_invalid;
std::deque<ValidatorIndex> for_invalid;
std::deque<ValidatorIndex> against_valid;
};

// We separate lost disputes to prioritize "for invalid" offenders. And
// among those, we prioritize backing votes the most. There's no need to
// limit the size of these sets, as they are already limited by the number
// of validators in the session. We use deque to ensure the iteration order
// prioritizes most recently disputes lost over older ones in case we reach
// the limit.
struct OffchainDisabledValidators {
std::map<SessionIndex, LostSessionDisputes> per_session;

void prune_old(SessionIndex up_to_excluding) {
std::erase_if(per_session,
[&](const auto &p) { return p.first < up_to_excluding; });
}

void insert_for_invalid(SessionIndex session_index,
ValidatorIndex validator_index,
bool is_backer) {
auto &entry = per_session[session_index];
if (is_backer) {
entry.backers_for_invalid.emplace_front(validator_index);
} else {
entry.for_invalid.emplace_front(validator_index);
}
}

void insert_against_valid(SessionIndex session_index,
ValidatorIndex validator_index) {
per_session[session_index].against_valid.emplace_front(validator_index);
}

std::vector<ValidatorIndex> asVector(SessionIndex session_index) {
std::vector<ValidatorIndex> res;
auto it = per_session.find(session_index);
if (it == per_session.end()) {
return res;
}
const auto &entry = it->second;

res.reserve(entry.backers_for_invalid.size() //
+ entry.for_invalid.size() //
+ entry.against_valid.size());

for (const auto &c : {entry.backers_for_invalid,
entry.for_invalid,
entry.against_valid}) {
res.insert(res.end(), c.begin(), c.end());
}
return res;
}
};

OffchainDisabledValidators offchain_disabled_validators_;

// Metrics
metrics::RegistryPtr metrics_registry_ = metrics::createRegistry();
metrics::Counter *metric_disputes_total_;
Expand Down
3 changes: 3 additions & 0 deletions core/dispute_coordinator/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ namespace kagome::dispute {
SessionInfo session;
/// Validator indices controlled by this node.
std::unordered_set<ValidatorIndex> controlled_indices{};
/// Indices of on-chain disabled validators at the `relay_parent` combined
/// with the off-chain state.
std::unordered_set<ValidatorIndex> disabled_indices{};
};

/// The status of an activated leaf.
Expand Down

0 comments on commit 6e46905

Please sign in to comment.