Skip to content

Commit

Permalink
revert
Browse files Browse the repository at this point in the history
Signed-off-by: iceseer <[email protected]>
  • Loading branch information
iceseer committed Sep 28, 2024
1 parent cd60736 commit 529a475
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 234 deletions.
4 changes: 1 addition & 3 deletions core/authority_discovery/query/query_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ namespace kagome::authority_discovery {
}
auto r = add(*id, value);
if (not r) {
SL_WARN(log_, "Can't add: {}", r.error());
} else {
SL_INFO(log_, "Added or updated success: {}", *id);
SL_DEBUG(log_, "Can't add: {}", r.error());
}
return r;
}
Expand Down
24 changes: 12 additions & 12 deletions core/network/impl/protocols/fetch_attested_candidate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::ParachainProcessorImpl> pp)
: RequestResponseProtocolImpl<
vstaging::AttestedCandidateRequest,
vstaging::AttestedCandidateResponse,
ScaleMessageReadWriter>{kFetchAttestedCandidateProtocolName,
host,
make_protocols(
kFetchAttestedCandidateProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kFetchAttestedCandidateProtocolName,
"req_attested_candidate_protocol")},
vstaging::AttestedCandidateRequest,
vstaging::AttestedCandidateResponse,
ScaleMessageReadWriter>{kFetchAttestedCandidateProtocolName,
host,
make_protocols(
kFetchAttestedCandidateProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kFetchAttestedCandidateProtocolName,
"req_attested_candidate_protocol")},
pp_{std::move(pp)} {
BOOST_ASSERT(pp_);
}
Expand All @@ -64,7 +64,7 @@ namespace kagome::network {
"Fetching attested candidate response failed.(error={})",
res.error());
} else {
base().logger()->trace("Fetching attested candidate response.");
SL_TRACE(base().logger(), "Fetching attested candidate response.");
}
return res;
}
Expand Down
53 changes: 28 additions & 25 deletions core/network/impl/protocols/protocol_fetch_available_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,25 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::AvailabilityStore> av_store)
: RequestResponseProtocolImpl<
FetchAvailableDataRequest,
FetchAvailableDataResponse,
ScaleMessageReadWriter>{kName,
host,
make_protocols(kFetchAvailableDataProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_available_data_protocol")},
FetchAvailableDataRequest,
FetchAvailableDataResponse,
ScaleMessageReadWriter>{kName,
host,
make_protocols(
kFetchAvailableDataProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName,
"req_available_data_protocol")},
av_store_{std::move(av_store)} {}

private:
std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType candidate_hash, std::shared_ptr<Stream>) override {
base().logger()->trace("Fetch available data .(candidate hash={})",
candidate_hash);
SL_TRACE(base().logger(),
"Fetch available data .(candidate hash={})",
candidate_hash);

if (auto r = av_store_->getPovAndData(candidate_hash)) {
return std::move(*r);
Expand All @@ -76,25 +79,25 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<parachain::BackingStore> backing_store)
: RequestResponseProtocolImpl<
FetchStatementRequest,
FetchStatementResponse,
ScaleMessageReadWriter>{kName,
host,
make_protocols(kFetchStatementProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_statement_protocol")},
FetchStatementRequest,
FetchStatementResponse,
ScaleMessageReadWriter>{kName,
host,
make_protocols(kFetchStatementProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(
kName, "req_statement_protocol")},
backing_store_{std::move(backing_store)} {}

private:
std::optional<outcome::result<ResponseType>> onRxRequest(
RequestType req, std::shared_ptr<Stream>) override {
base().logger()->trace(
"Statement fetch request received.(relay parent={}, candidate "
"hash={})",
req.relay_parent,
req.candidate_hash);
SL_TRACE(base().logger(),
"Statement fetch request received.(relay parent={}, candidate "
"hash={})",
req.relay_parent,
req.candidate_hash);

if (auto res = backing_store_->getCadidateInfo(req.relay_parent,
req.candidate_hash)) {
Expand Down
24 changes: 12 additions & 12 deletions core/network/impl/protocols/protocol_req_pov.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer)
: RequestResponseProtocolImpl<
RequestPov,
ResponsePov,
ScaleMessageReadWriter>{kReqPovProtocolName,
host,
make_protocols(kReqPovProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kReqPovProtocolName,
"req_pov_protocol")},
RequestPov,
ResponsePov,
ScaleMessageReadWriter>{kReqPovProtocolName,
host,
make_protocols(kReqPovProtocol,
genesis_hash,
kProtocolPrefixPolkadot),
log::createLogger(kReqPovProtocolName,
"req_pov_protocol")},
observer_{std::move(observer)} {}

protected:
Expand All @@ -57,8 +57,8 @@ namespace kagome::network {
}

void onTxRequest(const RequestPov &request) override {
base().logger()->trace("Transmit PoV request(candidate hash={})",
request);
SL_TRACE(
base().logger(), "Transmit PoV request(candidate hash={})", request);
}

private:
Expand All @@ -72,7 +72,7 @@ namespace kagome::network {
const blockchain::GenesisBlockHash &genesis_hash,
std::shared_ptr<ReqPovObserver> observer)
: impl_{std::make_shared<ReqPovProtocolImpl>(
host, chain_spec, genesis_hash, std::move(observer))} {}
host, chain_spec, genesis_hash, std::move(observer))} {}

const Protocol &ReqPovProtocol::protocolName() const {
BOOST_ASSERT(impl_ && !!"ReqPovProtocolImpl must be initialized!");
Expand Down
89 changes: 22 additions & 67 deletions core/parachain/approval/approval_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2572,57 +2572,6 @@ namespace kagome::parachain {
[&](const auto &) { UNREACHABLE; });
}

void ApprovalDistribution::broadcastToAllPeers(
const Hash &block_hash,
const std::shared_ptr<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>
&msg) {
auto se = pm_->getStreamEngine();
BOOST_ASSERT(se);

const auto header = block_tree_->getBlockHeader(block_hash);
if (header.has_value()) {
const auto session_index =
parachain_host_->session_index_for_child(header.value().parent_hash);
if (session_index.has_value()) {
const auto session = parachain_host_->session_info(
header.value().parent_hash, session_index.value());
if (session.has_value() && session.value()) {
for (const auto &id : session.value()->discovery_keys) {
if (auto peer = parachain_processor_->get_audi()->get(id)) {
parachain_processor_->tryOpenOutgoingValidationStream(
peer->id,
network::CollationVersion::VStaging,
[WEAK_SELF, peer{peer->id}, se, msg]() {
WEAK_LOCK(self);
se->send(peer,
self->router_->getValidationProtocolVStaging(),
msg);
});
} else {
SL_TRACE(logger_, "No audi for {}.", id);
}
}
} else {
SL_TRACE(logger_,
"No session. (block hash={}, error={})",
block_hash,
session.error());
}
} else {
SL_TRACE(logger_,
"No session index. (block hash={}, error={})",
block_hash,
session_index.error());
}
} else {
SL_TRACE(logger_,
"No block header in block tree. (block hash={}, error={})",
block_hash,
header.error());
}
}

void ApprovalDistribution::runDistributeAssignment(
const approval::IndirectAssignmentCertV2 &indirect_cert,
const scale::BitVec &candidate_indices,
Expand All @@ -2637,15 +2586,17 @@ namespace kagome::parachain {
"Distributing assignment on candidate (block hash={})",
indirect_cert.block_hash);

auto msg = std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Assignments{
.assignments = {network::vstaging::Assignment{
.indirect_assignment_cert = indirect_cert,
.candidate_bitfield = candidate_indices,
}}}});
broadcastToAllPeers(indirect_cert.block_hash, msg);
se->broadcast(
router_->getValidationProtocolVStaging(),
std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Assignments{
.assignments = {network::vstaging::Assignment{
.indirect_assignment_cert = indirect_cert,
.candidate_bitfield = candidate_indices,
}}}}),
[&](const libp2p::peer::PeerId &p) { return peers.contains(p); });
}

void ApprovalDistribution::send_assignments_batched(
Expand Down Expand Up @@ -2753,13 +2704,17 @@ namespace kagome::parachain {
vote.payload.payload.block_hash,
peers.size());

auto msg = std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Approvals{
.approvals = {vote},
}});
broadcastToAllPeers(vote.payload.payload.block_hash, msg);
se->broadcast(
router_->getValidationProtocolVStaging(),
std::make_shared<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>(
network::vstaging::ApprovalDistributionMessage{
network::vstaging::Assignments{
.assignments = {network::vstaging::Assignment{
.indirect_assignment_cert = indirect_cert,
.candidate_bitfield = candidate_indices,
}}}}),
[&](const libp2p::peer::PeerId &p) { return peers.contains(p); });
}

void ApprovalDistribution::issue_approval(const CandidateHash &candidate_hash,
Expand Down
6 changes: 0 additions & 6 deletions core/parachain/approval/approval_distribution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,12 +801,6 @@ namespace kagome::parachain {
const scale::BitVec &candidate_indices,
std::unordered_set<libp2p::peer::PeerId> &&peers);

void broadcastToAllPeers(
const Hash &block_hash,
const std::shared_ptr<
network::WireMessage<network::vstaging::ValidatorProtocolMessage>>
&msg);

void send_assignments_batched(
std::deque<network::vstaging::Assignment> &&assignments,
const libp2p::peer::PeerId &peer_id);
Expand Down
9 changes: 0 additions & 9 deletions core/parachain/availability/store/store_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ namespace kagome::parachain {
std::optional<AvailabilityStore::AvailableData>
AvailabilityStoreImpl::getPovAndData(
const CandidateHash &candidate_hash) const {
SL_DEBUG(logger, "{}. candidate: {})", __PRETTY_FUNCTION__, candidate_hash);

return state_.sharedAccess(
[&](const auto &state)
-> std::optional<AvailabilityStore::AvailableData> {
Expand Down Expand Up @@ -120,8 +118,6 @@ namespace kagome::parachain {
std::vector<ErasureChunk> &&chunks,
const ParachainBlock &pov,
const PersistedValidationData &data) {
SL_DEBUG(logger, "{}. candidate: {})", __PRETTY_FUNCTION__, candidate_hash);

state_.exclusiveAccess([&](auto &state) {
state.candidates_[relay_parent].insert(candidate_hash);
auto &candidate_data = state.per_candidate_[candidate_hash];
Expand All @@ -136,9 +132,6 @@ namespace kagome::parachain {
void AvailabilityStoreImpl::putChunk(const network::RelayHash &relay_parent,
const CandidateHash &candidate_hash,
ErasureChunk &&chunk) {
// SL_DEBUG(logger, "{}. candidate: {})", __PRETTY_FUNCTION__,
// candidate_hash);

state_.exclusiveAccess([&](auto &state) {
state.candidates_[relay_parent].insert(candidate_hash);
state.per_candidate_[candidate_hash].chunks[chunk.index] =
Expand All @@ -151,8 +144,6 @@ namespace kagome::parachain {
if (auto it = state.candidates_.find(relay_parent);
it != state.candidates_.end()) {
for (auto const &l : it->second) {
SL_DEBUG(logger, "{}. candidate: {})", __PRETTY_FUNCTION__, l);

state.per_candidate_.erase(l);
}
state.candidates_.erase(it);
Expand Down
11 changes: 3 additions & 8 deletions core/parachain/validator/impl/parachain_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,6 @@ namespace kagome::parachain {
av_store_->remove(lost);
our_current_state_.per_leaf.erase(lost);
our_current_state_.state_by_relay_parent.erase(lost);

av_store_->remove(lost);
}
our_current_state_.active_leaves[relay_parent] =
prospective_parachains_->prospectiveParachainsMode(relay_parent);
Expand Down Expand Up @@ -1143,12 +1141,7 @@ namespace kagome::parachain {

/// update peers in grid view
const auto &grid_view = *per_session_state->value().grid_view;
SL_TRACE(logger_, "++>>> groups={}", grid_view.size());
for (const auto &view : grid_view) {
SL_TRACE(logger_,
"++>>> group_size(s)={}, group_size(r)={}",
view.sending.size(),
view.receiving.size());
for (const auto vi : view.sending) {
spawn_and_update_peer(peers_sent, session_info->discovery_keys[vi]);
}
Expand Down Expand Up @@ -4490,7 +4483,9 @@ namespace kagome::parachain {
const libp2p::peer::PeerId &peer_id,
const std::shared_ptr<network::ProtocolBase> &protocol) {
BOOST_ASSERT(protocol);
logger_->info("Send my view.(peer={}, protocol={})",
CHECK_OR_RET(canProcessParachains().has_value());

SL_INFO(logger_, "Send my view.(peer={}, protocol={})",
peer_id,
protocol->protocolName());
pm_->getStreamEngine()->send(
Expand Down
Loading

0 comments on commit 529a475

Please sign in to comment.