Skip to content

Commit

Permalink
Fix/kusama grid size fixes (#2180)
Browse files Browse the repository at this point in the history
* fix recovery (validators <= discovery_keys)

Signed-off-by: turuslan <[email protected]>

* update our view in topology

Signed-off-by: iceseer <[email protected]>

* not to doublicate sends

Signed-off-by: iceseer <[email protected]>

* grid view from authorities

Signed-off-by: iceseer <[email protected]>

* force authority discovery for the given peer set

Signed-off-by: iceseer <[email protected]>

* fixup fork wakeups

Signed-off-by: iceseer <[email protected]>

* unify_with_peer

Signed-off-by: iceseer <[email protected]>

* Fix kusama parachain validator crash

* Fix minideb-release.Dockerfile + tini

Signed-off-by: Igor Egorov <[email protected]>

Signed-off-by: iceseer <[email protected]>

---------

Signed-off-by: turuslan <[email protected]>
Signed-off-by: iceseer <[email protected]>
Signed-off-by: Dmitriy Khaustov aka xDimon <[email protected]>
Signed-off-by: Igor Egorov <[email protected]>
Co-authored-by: turuslan <[email protected]>
Co-authored-by: Dmitriy Khaustov aka xDimon <[email protected]>
Co-authored-by: Igor Egorov <[email protected]>
  • Loading branch information
4 people authored Oct 2, 2024
1 parent 8f241db commit dfa97aa
Show file tree
Hide file tree
Showing 12 changed files with 1,705 additions and 166 deletions.
24 changes: 12 additions & 12 deletions core/network/impl/protocols/fetch_attested_candidate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp)
: RequestResponseProtocolImpl<
vstaging::AttestedCandidateRequest,
vstaging::AttestedCandidateResponse,
ScaleMessageReadWriter>{kFetchAttestedCandidateProtocolName,
host,
make_protocols(
kFetchAttestedCandidateProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kFetchAttestedCandidateProtocolName,
"req_attested_candidate_protocol")},
vstaging::AttestedCandidateRequest,
vstaging::AttestedCandidateResponse,
ScaleMessageReadWriter>{kFetchAttestedCandidateProtocolName,
host,
make_protocols(
kFetchAttestedCandidateProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kFetchAttestedCandidateProtocolName,
"req_attested_candidate_protocol")},
pp_{std::move(pp)} {
BOOST_ASSERT(pp_);
}
Expand All @@ -64,7 +64,7 @@ namespace kagome::network {
"Fetching attested candidate response failed.(error={})",
res.error());
} else {
base().logger()->trace("Fetching attested candidate response.");
SL_TRACE(base().logger(), "Fetching attested candidate response.");
}
return res;
}
Expand Down
52 changes: 29 additions & 23 deletions core/network/impl/protocols/protocol_fetch_available_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,26 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::AvailabilityStore> av_store)
: RequestResponseProtocolImpl<
FetchAvailableDataRequest,
FetchAvailableDataResponse,
ScaleMessageReadWriter>{kName,
host,
make_protocols(kFetchAvailableDataProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_available_data_protocol")},
FetchAvailableDataRequest,
FetchAvailableDataResponse,
ScaleMessageReadWriter>{kName,
host,
make_protocols(
kFetchAvailableDataProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName,
"req_available_data_protocol")},
av_store_{std::move(av_store)} {}

private:
std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType candidate_hash, std::shared_ptr<Stream>) override {
SL_TRACE(base().logger(),
"Fetch available data .(candidate hash={})",
candidate_hash);

if (auto r = av_store_->getPovAndData(candidate_hash)) {
return std::move(*r);
}
Expand All @@ -73,25 +79,25 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::BackingStore> backing_store)
: RequestResponseProtocolImpl<
FetchStatementRequest,
FetchStatementResponse,
ScaleMessageReadWriter>{kName,
host,
make_protocols(kFetchStatementProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_statement_protocol")},
FetchStatementRequest,
FetchStatementResponse,
ScaleMessageReadWriter>{kName,
host,
make_protocols(kFetchStatementProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_statement_protocol")},
backing_store_{std::move(backing_store)} {}

private:
std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType req, std::shared_ptr<Stream>) override {
base().logger()->trace(
"Statement fetch request received.(relay parent={}, candidate "
"hash={})",
req.relay_parent,
req.candidate_hash);
SL_TRACE(base().logger(),
"Statement fetch request received.(relay parent={}, candidate "
"hash={})",
req.relay_parent,
req.candidate_hash);

if (auto res = backing_store_->getCadidateInfo(req.relay_parent,
req.candidate_hash)) {
Expand Down
24 changes: 12 additions & 12 deletions core/network/impl/protocols/protocol_req_pov.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer)
: RequestResponseProtocolImpl<
RequestPov,
ResponsePov,
ScaleMessageReadWriter>{kReqPovProtocolName,
host,
make_protocols(kReqPovProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kReqPovProtocolName,
"req_pov_protocol")},
RequestPov,
ResponsePov,
ScaleMessageReadWriter>{kReqPovProtocolName,
host,
make_protocols(kReqPovProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kReqPovProtocolName,
"req_pov_protocol")},
observer_{std::move(observer)} {}

protected:
Expand All @@ -57,8 +57,8 @@ namespace kagome::network {
}

void onTxRequest(const RequestPov &request) override {
base().logger()->trace("Transmit PoV request(candidate hash={})",
request);
SL_TRACE(
base().logger(), "Transmit PoV request(candidate hash={})", request);
}

private:
Expand All @@ -72,7 +72,7 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer)
: impl_{std::make_shared<ReqPovProtocolImpl>(
host, chain_spec, genesis_hash, std::move(observer))} {}
host, chain_spec, genesis_hash, std::move(observer))} {}

const Protocol &ReqPovProtocol::protocolName() const {
BOOST_ASSERT(impl_ && !!"ReqPovProtocolImpl must be initialized!");
Expand Down
93 changes: 43 additions & 50 deletions core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ namespace kagome::parachain {
common::MainThreadPool &main_thread_pool,
LazySPtr<dispute::DisputeCoordinator> dispute_coordinator)
: approval_thread_handler_{poolHandlerReadyMake(
this, app_state_manager, approval_thread_pool, logger_)},
this, app_state_manager, approval_thread_pool, logger_)},
worker_pool_handler_{worker_thread_pool.handler(*app_state_manager)},
parachain_host_(std::move(parachain_host)),
slots_util_(slots_util),
Expand Down Expand Up @@ -1717,17 +1717,19 @@ namespace kagome::parachain {
auto cb = [weak_self{wself},
hashed_candidate,
session_index,
validator_index,
relay_block_hash](outcome::result<Pvf::Result> outcome) {
validator_index](outcome::result<Pvf::Result> outcome) {
auto self = weak_self.lock();
if (not self) {
return;
}
const auto &candidate_receipt = hashed_candidate.get();

std::vector<Hash> advence_hashes;
self->approvals_cache_.exclusiveAccess([&](auto &approvals_cache) {
if (auto it = approvals_cache.find(hashed_candidate.getHash());
it != approvals_cache.end()) {
ApprovalCache &ac = it->second;
advence_hashes.assign(ac.blocks_.begin(), ac.blocks_.end());
ac.approval_result = outcome.has_error()
? ApprovalOutcome::Failed
: ApprovalOutcome::Approved;
Expand All @@ -1746,9 +1748,10 @@ namespace kagome::parachain {
candidate_receipt,
false);
} else {
self->issue_approval(hashed_candidate.getHash(),
validator_index,
relay_block_hash);
for (const auto &b : advence_hashes) {
self->issue_approval(
hashed_candidate.getHash(), validator_index, b);
}
}
};
self->pvf_->pvfValidate(available_data.validation_data,
Expand Down Expand Up @@ -2084,12 +2087,12 @@ namespace kagome::parachain {
return;
}
} else {
SL_WARN(logger_,
"Assignment from a peer is out of view. (peer id={}, "
"block_hash={}, validator index={})",
peer_id,
std::get<0>(message_subject),
std::get<2>(message_subject));
SL_TRACE(logger_,
"Assignment from a peer is out of view. (peer id={}, "
"block_hash={}, validator index={})",
peer_id,
std::get<0>(message_subject),
std::get<2>(message_subject));
}

/// if the assignment is known to be valid, reward the peer
Expand Down Expand Up @@ -2586,24 +2589,17 @@ namespace kagome::parachain {
auto se = pm_->getStreamEngine();
BOOST_ASSERT(se);

auto msg = std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Assignments{
.assignments = {network::vstaging::Assignment{
.indirect_assignment_cert = indirect_cert,
.candidate_bitfield = candidate_indices,
}}}});

for (const auto &peer : peers) {
parachain_processor_->tryOpenOutgoingValidationStream(
peer,
network::CollationVersion::VStaging,
[WEAK_SELF, peer{peer}, se, msg]() {
WEAK_LOCK(self);
se->send(peer, self->router_->getValidationProtocolVStaging(), msg);
});
}
se->broadcast(
router_->getValidationProtocolVStaging(),
std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Assignments{
.assignments = {network::vstaging::Assignment{
.indirect_assignment_cert = indirect_cert,
.candidate_bitfield = candidate_indices,
}}}}),
[&](const libp2p::peer::PeerId &p) { return peers.contains(p); });
}

void ApprovalDistribution::send_assignments_batched(
Expand Down Expand Up @@ -2714,22 +2710,15 @@ namespace kagome::parachain {
auto se = pm_->getStreamEngine();
BOOST_ASSERT(se);

auto msg = std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Approvals{
.approvals = {vote},
}});

for (const auto &peer : peers) {
parachain_processor_->tryOpenOutgoingValidationStream(
peer,
network::CollationVersion::VStaging,
[WEAK_SELF, peer{peer}, se, msg]() {
WEAK_LOCK(self);
se->send(peer, self->router_->getValidationProtocolVStaging(), msg);
});
}
se->broadcast(
router_->getValidationProtocolVStaging(),
std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Approvals{
.approvals = {vote},
}}),
[&](const libp2p::peer::PeerId &p) { return peers.contains(p); });
}

void ApprovalDistribution::issue_approval(const CandidateHash &candidate_hash,
Expand Down Expand Up @@ -2879,7 +2868,10 @@ namespace kagome::parachain {
GroupIndex backing_group,
std::optional<CoreIndex> core,
bool distribute_assignment) {
/// TODO(iceseer): don't launch approval work if the node is syncing.
if (!parachain_processor_->canProcessParachains()) {
return;
}

const auto &block_hash = indirect_cert.block_hash;
const auto validator_index = indirect_cert.validator;

Expand Down Expand Up @@ -2940,9 +2932,10 @@ namespace kagome::parachain {
};
return approval::min_or_some(
e.next_no_show,
(e.last_assignment_tick ? filter(
*e.last_assignment_tick + kApprovalDelay, tick_now)
: std::optional<Tick>{}));
(e.last_assignment_tick
? filter(*e.last_assignment_tick + kApprovalDelay,
tick_now)
: std::optional<Tick>{}));
},
[&](const approval::PendingRequiredTranche &e) {
std::optional<DelayTranche> next_announced{};
Expand Down
14 changes: 0 additions & 14 deletions core/parachain/availability/bitfield/signer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,6 @@ namespace kagome::parachain {
for (auto &candidate : candidates) {
bitfield.bits.push_back(
candidate && store_->hasChunk(*candidate, signer.validatorIndex()));
if (candidate) {
SL_TRACE(logger_,
"Signing bitfields.(relay_parent={}, validator index={}, has "
"chunk={})",
relay_parent,
signer.validatorIndex(),
bitfield.bits.back() ? 1 : 0);
} else {
SL_TRACE(logger_,
"Signing bitfields.(relay_parent={}, validator index={}, NOT "
"OCCUPIED)",
relay_parent,
signer.validatorIndex());
}
}

OUTCOME_TRY(signed_bitfield, signer.sign(bitfield));
Expand Down
11 changes: 4 additions & 7 deletions core/parachain/backing/grid.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ namespace kagome::parachain::grid {

/// View for one group
struct View {
std::unordered_set<ValidatorIndex> receiving, sending;
std::unordered_set<ValidatorIndex> sending, receiving;

bool operator==(const View &r) const = default;

bool canReceive(bool full, ValidatorIndex from) const {
return (full ? receiving : sending).contains(from);
Expand Down Expand Up @@ -237,12 +239,7 @@ namespace kagome::parachain::grid {
}

inline std::vector<ValidatorIndex> shuffle(
const std::vector<std::vector<ValidatorIndex>> &groups,
std::span<const uint8_t, 32> babe_randomness) {
size_t n = 0;
for (auto &group : groups) {
n += group.size();
}
size_t n, std::span<const uint8_t, 32> babe_randomness) {
std::vector<ValidatorIndex> validators;
validators.resize(n);
std::iota(validators.begin(), validators.end(), 0);
Expand Down
Loading

0 comments on commit dfa97aa

Please sign in to comment.