Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pvf precheck #1705

Merged
merged 3 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/parachain/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ add_library(validator_parachain
availability/recovery/recovery_impl.cpp
availability/store/store_impl.cpp
backing/store_impl.cpp
pvf/precheck.cpp
pvf/pvf_impl.cpp
validator/impl/parachain_observer_impl.cpp
validator/impl/parachain_processor.cpp
Expand Down
12 changes: 3 additions & 9 deletions core/parachain/availability/bitfield/signer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@
namespace kagome::parachain {
constexpr std::chrono::milliseconds kDelay{1500};

namespace {
inline auto log() {
return log::createLogger("BitfieldSigner");
}
} // namespace

BitfieldSigner::BitfieldSigner(
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ValidatorSignerFactory> signer_factory,
Expand Down Expand Up @@ -52,7 +46,7 @@ namespace kagome::parachain {
boost::get<primitives::events::HeadsEventParams>(event))
.value()));
if (r.has_error()) {
SL_DEBUG(log(), "onBlock error {}", r.error());
SL_DEBUG(self->logger_, "onBlock error {}", r.error());
}
}
});
Expand All @@ -66,7 +60,7 @@ namespace kagome::parachain {

outcome::result<void> BitfieldSigner::sign(const ValidatorSigner &signer,
const Candidates &candidates) {
BlockHash const &relay_parent = signer.relayParent();
const BlockHash &relay_parent = signer.relayParent();
scale::BitVec bitfield;
bitfield.bits.reserve(candidates.size());
for (auto &candidate : candidates) {
Expand Down Expand Up @@ -125,7 +119,7 @@ namespace kagome::parachain {
if (auto self = weak.lock()) {
auto r = self->sign(signer, candidates);
if (r.has_error()) {
SL_WARN(log(), "sign error {}", r.error());
SL_WARN(self->logger_, "sign error {}", r.error());
}
}
},
Expand Down
107 changes: 107 additions & 0 deletions core/parachain/pvf/precheck.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "parachain/pvf/precheck.hpp"

#include "offchain/offchain_worker_factory.hpp"
#include "offchain/offchain_worker_pool.hpp"
#include "runtime/common/uncompress_code_if_needed.hpp"
#include "runtime/module.hpp"
#include "runtime/module_factory.hpp"

namespace kagome::parachain {
PvfPrecheck::PvfPrecheck(
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ValidatorSignerFactory> signer_factory,
std::shared_ptr<runtime::ParachainHost> parachain_api,
std::shared_ptr<runtime::ModuleFactory> module_factory,
std::shared_ptr<runtime::Executor> executor,
std::shared_ptr<offchain::OffchainWorkerFactory> offchain_worker_factory,
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool)
: hasher_{std::move(hasher)},
signer_factory_{std::move(signer_factory)},
parachain_api_{std::move(parachain_api)},
module_factory_{std::move(module_factory)},
executor_{std::move(executor)},
offchain_worker_factory_{std::move(offchain_worker_factory)},
offchain_worker_pool_{std::move(offchain_worker_pool)} {}

void PvfPrecheck::start(
std::shared_ptr<primitives::events::ChainSubscriptionEngine>
chain_sub_engine) {
chain_sub_ = std::make_shared<primitives::events::ChainEventSubscriber>(
chain_sub_engine);
chain_sub_->subscribe(chain_sub_->generateSubscriptionSetId(),
primitives::events::ChainEventType::kNewHeads);
chain_sub_->setCallback(
[weak = weak_from_this()](
subscription::SubscriptionSetId,
auto &&,
primitives::events::ChainEventType,
const primitives::events::ChainEventParams &event) {
if (auto self = weak.lock()) {
self->thread_.io_context()->post(
[weak,
header{boost::get<primitives::events::HeadsEventParams>(event)
.get()}] {
if (auto self = weak.lock()) {
auto block_hash = self->hasher_->blake2b_256(
scale::encode(header).value());
auto r = self->onBlock(block_hash, header);
if (r.has_error()) {
SL_DEBUG(self->logger_, "onBlock error {}", r.error());
}
}
});
}
});
}

outcome::result<void> PvfPrecheck::onBlock(
const BlockHash &block_hash, const primitives::BlockHeader &header) {
OUTCOME_TRY(signer, signer_factory_->at(block_hash));
if (not signer.has_value()) {
return outcome::success();
}
OUTCOME_TRY(need, parachain_api_->pvfs_require_precheck(block_hash));
for (auto &code_hash : need) {
if (not seen_.emplace(code_hash).second) {
continue;
}
auto code_zstd_res =
parachain_api_->validation_code_by_hash(block_hash, code_hash);
if (not code_zstd_res or not code_zstd_res.value()) {
seen_.erase(code_hash);
continue;
}
auto &code_zstd = *code_zstd_res.value();
ParachainRuntime code;
auto res = [&]() -> outcome::result<void> {
turuslan marked this conversation as resolved.
Show resolved Hide resolved
OUTCOME_TRY(runtime::uncompressCodeIfNeeded(code_zstd, code));
OUTCOME_TRY(module_factory_->make(code));
return outcome::success();
}();
if (res) {
SL_VERBOSE(logger_, "approve {}", code_hash);
} else {
SL_WARN(logger_, "reject {}: {}", code_hash, res.error());
}
PvfCheckStatement statement{
res.has_value(),
code_hash,
signer->getSessionIndex(),
signer->validatorIndex(),
};
OUTCOME_TRY(signature, signer->signRaw(statement.signable()));
offchain_worker_pool_->addWorker(
offchain_worker_factory_->make(executor_, header));
auto remove =
gsl::finally([&] { offchain_worker_pool_->removeWorker(); });
OUTCOME_TRY(parachain_api_->submit_pvf_check_statement(
block_hash, statement, signature));
}
return outcome::success();
}
} // namespace kagome::parachain
70 changes: 70 additions & 0 deletions core/parachain/pvf/precheck.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef KAGOME_PARACHAIN_PVF_PRECHECK_HPP
#define KAGOME_PARACHAIN_PVF_PRECHECK_HPP

#include <unordered_set>

#include "crypto/hasher.hpp"
#include "log/logger.hpp"
#include "parachain/validator/signer.hpp"
#include "primitives/event_types.hpp"
#include "runtime/runtime_api/parachain_host.hpp"
#include "utils/thread_pool.hpp"

namespace kagome::offchain {
class OffchainWorkerFactory;
class OffchainWorkerPool;
} // namespace kagome::offchain

namespace kagome::runtime {
class Executor;
class ModuleFactory;
} // namespace kagome::runtime

namespace kagome::parachain {
/// Signs pvf check statement for every new head.
class PvfPrecheck : public std::enable_shared_from_this<PvfPrecheck> {
public:
using BroadcastCallback = std::function<void(
const primitives::BlockHash &, const network::SignedBitfield &)>;
using Candidates = std::vector<std::optional<network::CandidateHash>>;

PvfPrecheck(
std::shared_ptr<crypto::Hasher> hasher,
std::shared_ptr<ValidatorSignerFactory> signer_factory,
std::shared_ptr<runtime::ParachainHost> parachain_api,
std::shared_ptr<runtime::ModuleFactory> module_factory,
std::shared_ptr<runtime::Executor> executor,
std::shared_ptr<offchain::OffchainWorkerFactory>
offchain_worker_factory,
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool);

/// Subscribes to new heads.
void start(std::shared_ptr<primitives::events::ChainSubscriptionEngine>
chain_sub_engine);

private:
using BlockHash = primitives::BlockHash;

outcome::result<void> onBlock(const BlockHash &block_hash,
const primitives::BlockHeader &header);

std::shared_ptr<crypto::Hasher> hasher_;
std::shared_ptr<ValidatorSignerFactory> signer_factory_;
std::shared_ptr<runtime::ParachainHost> parachain_api_;
std::shared_ptr<runtime::ModuleFactory> module_factory_;
std::shared_ptr<runtime::Executor> executor_;
std::shared_ptr<offchain::OffchainWorkerFactory> offchain_worker_factory_;
std::shared_ptr<offchain::OffchainWorkerPool> offchain_worker_pool_;
std::shared_ptr<primitives::events::ChainEventSubscriber> chain_sub_;
std::unordered_set<ValidationCodeHash> seen_;
ThreadPool thread_{"PvfPrecheck", 1};
log::Logger logger_ = log::createLogger("PvfPrecheck", "parachain");
};
} // namespace kagome::parachain

#endif // KAGOME_PARACHAIN_PVF_PRECHECK_HPP
15 changes: 14 additions & 1 deletion core/parachain/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace kagome::parachain {
using IndexedAndSigned = kagome::crypto::Sr25519Signed<Indexed<T>>;

template <typename T>
[[maybe_unused]] inline T const &getPayload(IndexedAndSigned<T> const &t) {
[[maybe_unused]] inline const T &getPayload(const IndexedAndSigned<T> &t) {
return t.payload.payload;
}

Expand All @@ -78,6 +78,19 @@ namespace kagome::parachain {
return t.payload.payload;
}

struct PvfCheckStatement {
SCALE_TIE(4);

bool accept;
ValidationCodeHash subject;
SessionIndex session_index;
ValidatorIndex validator_index;

auto signable() {
constexpr std::array<uint8_t, 4> kMagic{'V', 'C', 'P', 'C'};
return scale::encode(std::make_tuple(kMagic, *this)).value();
}
};
} // namespace kagome::parachain

#endif // KAGOME_PARACHAIN_PRIMITIVES_HPP
4 changes: 4 additions & 0 deletions core/parachain/validator/impl/parachain_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ namespace kagome::parachain {
std::shared_ptr<network::PeerView> peer_view,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<parachain::BitfieldSigner> bitfield_signer,
std::shared_ptr<parachain::PvfPrecheck> pvf_precheck,
std::shared_ptr<parachain::BitfieldStore> bitfield_store,
std::shared_ptr<parachain::BackingStore> backing_store,
std::shared_ptr<parachain::Pvf> pvf,
Expand All @@ -87,6 +88,7 @@ namespace kagome::parachain {
pvf_(std::move(pvf)),
signer_factory_(std::move(signer_factory)),
bitfield_signer_(std::move(bitfield_signer)),
pvf_precheck_(std::move(pvf_precheck)),
bitfield_store_(std::move(bitfield_store)),
backing_store_(std::move(backing_store)),
av_store_(std::move(av_store)),
Expand Down Expand Up @@ -147,6 +149,8 @@ namespace kagome::parachain {
if (not was_synchronized) {
self->bitfield_signer_->start(
self->peer_view_->intoChainEventsEngine());
self->pvf_precheck_->start(
self->peer_view_->intoChainEventsEngine());
was_synchronized = true;
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/parachain/validator/parachain_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "parachain/availability/bitfield/signer.hpp"
#include "parachain/availability/store/store.hpp"
#include "parachain/backing/store.hpp"
#include "parachain/pvf/precheck.hpp"
#include "parachain/pvf/pvf.hpp"
#include "parachain/validator/signer.hpp"
#include "primitives/common.hpp"
Expand Down Expand Up @@ -84,6 +85,7 @@ namespace kagome::parachain {
std::shared_ptr<network::PeerView> peer_view,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<parachain::BitfieldSigner> bitfield_signer,
std::shared_ptr<parachain::PvfPrecheck> pvf_precheck,
std::shared_ptr<parachain::BitfieldStore> bitfield_store,
std::shared_ptr<parachain::BackingStore> backing_store,
std::shared_ptr<parachain::Pvf> pvf,
Expand Down Expand Up @@ -417,6 +419,7 @@ namespace kagome::parachain {
std::shared_ptr<parachain::Pvf> pvf_;
std::shared_ptr<parachain::ValidatorSignerFactory> signer_factory_;
std::shared_ptr<parachain::BitfieldSigner> bitfield_signer_;
std::shared_ptr<parachain::PvfPrecheck> pvf_precheck_;
std::shared_ptr<parachain::BitfieldStore> bitfield_store_;
std::shared_ptr<parachain::BackingStore> backing_store_;
std::shared_ptr<parachain::AvailabilityStore> av_store_;
Expand Down
4 changes: 4 additions & 0 deletions core/parachain/validator/signer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ namespace kagome::parachain {
};
}

outcome::result<Signature> signRaw(common::BufferView data) const {
return sr25519_provider_->sign(*keypair_, data);
}

SessionIndex getSessionIndex() const;

/// Get validator index.
Expand Down
15 changes: 15 additions & 0 deletions core/runtime/runtime_api/impl/parachain_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,19 @@ namespace kagome::runtime {
block, "ParachainHost_inbound_hrmp_channels_contents", id);
}

outcome::result<std::vector<ValidationCodeHash>>
ParachainHostImpl::pvfs_require_precheck(const primitives::BlockHash &block) {
return executor_->callAt<std::vector<ValidationCodeHash>>(
block, "ParachainHost_pvfs_require_precheck");
}

outcome::result<void> ParachainHostImpl::submit_pvf_check_statement(
const primitives::BlockHash &block,
const parachain::PvfCheckStatement &statement,
const parachain::Signature &signature) {
return executor_->callAt<void>(block,
"ParachainHost_submit_pvf_check_statement",
statement,
signature);
}
} // namespace kagome::runtime
8 changes: 8 additions & 0 deletions core/runtime/runtime_api/impl/parachain_host.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ namespace kagome::runtime {
inbound_hrmp_channels_contents(const primitives::BlockHash &block,
ParachainId id) override;

outcome::result<std::vector<ValidationCodeHash>> pvfs_require_precheck(
turuslan marked this conversation as resolved.
Show resolved Hide resolved
const primitives::BlockHash &block) override;

outcome::result<void> submit_pvf_check_statement(
const primitives::BlockHash &block,
const parachain::PvfCheckStatement &statement,
const parachain::Signature &signature) override;

private:
std::shared_ptr<Executor> executor_;
};
Expand Down
14 changes: 14 additions & 0 deletions core/runtime/runtime_api/parachain_host.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,20 @@ namespace kagome::runtime {
std::map<ParachainId, std::vector<InboundHrmpMessage>>>
inbound_hrmp_channels_contents(const primitives::BlockHash &block,
ParachainId id) = 0;

/**
* @return list of pvf requiring precheck
*/
virtual outcome::result<std::vector<ValidationCodeHash>>
pvfs_require_precheck(const primitives::BlockHash &block) = 0;

/**
* @return submit pvf check statement
*/
virtual outcome::result<void> submit_pvf_check_statement(
const primitives::BlockHash &block,
const parachain::PvfCheckStatement &statement,
const parachain::Signature &signature) = 0;
};

} // namespace kagome::runtime
Expand Down