diff --git a/Cargo.lock b/Cargo.lock index 33365dfeff..9a519d1a17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9394,7 +9394,6 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", - "log", "lru 0.11.1", "parity-scale-codec", "parking_lot 0.12.1", @@ -9425,6 +9424,7 @@ dependencies = [ "subspace-verification", "thiserror", "tokio", + "tracing", ] [[package]] diff --git a/crates/pallet-subspace/src/lib.rs b/crates/pallet-subspace/src/lib.rs index 0f89eb45cb..09341dc893 100644 --- a/crates/pallet-subspace/src/lib.rs +++ b/crates/pallet-subspace/src/lib.rs @@ -49,8 +49,8 @@ use sp_consensus_subspace::consensus::{is_proof_of_time_valid, verify_solution}; use sp_consensus_subspace::digests::CompatibleDigestItem; use sp_consensus_subspace::offence::{OffenceDetails, OffenceError, OnOffenceHandler}; use sp_consensus_subspace::{ - ChainConstants, EquivocationProof, FarmerPublicKey, FarmerSignature, PotParameters, - PotParametersChange, SignedVote, Vote, WrappedPotOutput, + EquivocationProof, FarmerPublicKey, FarmerSignature, PotParameters, PotParametersChange, + SignedVote, Vote, WrappedPotOutput, }; use sp_runtime::generic::DigestItem; use sp_runtime::traits::{BlockNumberProvider, CheckedSub, Hash, One, Zero}; @@ -1142,25 +1142,6 @@ impl Pallet { u64::from(archived_segments) * ArchivedHistorySegment::SIZE as u64 } - - pub fn chain_constants() -> ChainConstants { - ChainConstants::V0 { - confirmation_depth_k: T::ConfirmationDepthK::get() - .try_into() - .unwrap_or_else(|_| panic!("Block number always fits in BlockNumber; qed")), - block_authoring_delay: T::BlockAuthoringDelay::get(), - era_duration: T::EraDuration::get() - .try_into() - .unwrap_or_else(|_| panic!("Block number always fits in BlockNumber; qed")), - slot_probability: T::SlotProbability::get(), - recent_segments: T::RecentSegments::get(), - recent_history_fraction: ( - T::RecentHistoryFraction::get().0, - T::RecentHistoryFraction::get().1, - ), - min_sector_lifetime: T::MinSectorLifetime::get(), - } - } } impl Pallet diff --git a/crates/sc-consensus-subspace-rpc/src/lib.rs b/crates/sc-consensus-subspace-rpc/src/lib.rs index d5af835e31..bdd8716547 100644 --- a/crates/sc-consensus-subspace-rpc/src/lib.rs +++ b/crates/sc-consensus-subspace-rpc/src/lib.rs @@ -29,10 +29,12 @@ use lru::LruCache; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; use sc_client_api::{AuxStore, BlockBackend}; -use sc_consensus_subspace::archiver::{recreate_genesis_segment, SegmentHeadersStore}; +use sc_consensus_subspace::archiver::{ + recreate_genesis_segment, ArchivedSegmentNotification, SegmentHeadersStore, +}; use sc_consensus_subspace::notification::SubspaceNotificationStream; -use sc_consensus_subspace::{ - ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle, +use sc_consensus_subspace::slot_worker::{ + NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle, }; use sc_rpc::{DenyUnsafe, SubscriptionTaskExecutor}; use sc_utils::mpsc::TracingUnboundedSender; @@ -301,7 +303,6 @@ where })?; let farmer_app_info: Result = try { - let slot_duration = runtime_api.slot_duration(best_hash)?; let chain_constants = runtime_api.chain_constants(best_hash)?; let protocol_info = FarmerProtocolInfo { history_size: runtime_api.history_size(best_hash)?, @@ -314,7 +315,8 @@ where FarmerAppInfo { genesis_hash, dsn_bootstrap_nodes: self.dsn_bootstrap_nodes.clone(), - farming_timeout: slot_duration + farming_timeout: chain_constants + .slot_duration() .as_duration() .mul_f64(SlotNumber::from(chain_constants.block_authoring_delay()) as f64), protocol_info, diff --git a/crates/sc-consensus-subspace/Cargo.toml b/crates/sc-consensus-subspace/Cargo.toml index 79f9468c71..d50a1a710c 100644 --- a/crates/sc-consensus-subspace/Cargo.toml +++ b/crates/sc-consensus-subspace/Cargo.toml @@ -17,7 +17,6 @@ targets = ["x86_64-unknown-linux-gnu"] async-trait = "0.1.73" codec = { package = "parity-scale-codec", version = "3.6.5", features = ["derive"] } futures = "0.3.29" -log = "0.4.20" lru = "0.11.0" parking_lot = "0.12.1" rand = "0.8.5" @@ -47,6 +46,7 @@ subspace-proof-of-space = { version = "0.1.0", path = "../subspace-proof-of-spac subspace-verification = { version = "0.1.0", path = "../subspace-verification" } thiserror = "1.0.48" tokio = { version = "1.34.0", features = ["sync"] } +tracing = "0.1.37" [dev-dependencies] # TODO: Restore in the future, currently tests are mostly broken and useless diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 8624e512f2..952551f94f 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -19,13 +19,11 @@ //! Contains implementation of archiving process in Subspace blockchain that converts blockchain //! history (blocks) into archived history (pieces). -use crate::{ - ArchivedSegmentNotification, BlockImportingNotification, SubspaceLink, - SubspaceNotificationSender, SubspaceSyncOracle, -}; +use crate::block_import::BlockImportingNotification; +use crate::slot_worker::SubspaceSyncOracle; +use crate::{SubspaceLink, SubspaceNotificationSender}; use codec::{Decode, Encode}; use futures::StreamExt; -use log::{debug, info, warn}; use parking_lot::Mutex; use rand::prelude::*; use rand_chacha::ChaCha8Rng; @@ -33,7 +31,7 @@ use rayon::prelude::*; use rayon::ThreadPoolBuilder; use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; -use sc_utils::mpsc::tracing_unbounded; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; use sp_consensus::SyncOracle; @@ -44,6 +42,7 @@ use sp_runtime::traits::{Block as BlockT, CheckedSub, Header, NumberFor, One, Ze use sp_runtime::{Justifications, Saturating}; use std::error::Error; use std::future::Future; +use std::num::NonZeroUsize; use std::slice; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; @@ -51,10 +50,22 @@ use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::objects::BlockObjectMapping; use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex}; +use tracing::{debug, info, warn}; /// This corresponds to default value of `--max-runtime-instances` in Substrate const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 8; +/// How deep (in segments) should block be in order to be finalized. +/// +/// This is required for full nodes to not prune recent history such that keep-up sync in Substrate +/// works even without archival nodes (initial sync will be done from DSN). +/// +/// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in +/// Substrate and is not worth it right now. +/// https://github.com/paritytech/substrate/discussions/14359 +pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: NonZeroUsize = + NonZeroUsize::new(5).expect("Not zero; qed"); + #[derive(Debug)] struct SegmentHeadersStoreInner { aux_store: Arc, @@ -89,10 +100,7 @@ where let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY); let mut next_key_index = 0; - debug!( - target: "subspace", - "Started loading segment headers into cache" - ); + debug!("Started loading segment headers into cache"); while let Some(segment_headers) = aux_store .get_aux(&Self::key(next_key_index))? @@ -104,10 +112,7 @@ where cache.extend(segment_headers); next_key_index += 1; } - debug!( - target: "subspace", - "Finished loading segment headers into cache" - ); + debug!("Finished loading segment headers into cache"); Ok(Self { inner: Arc::new(SegmentHeadersStoreInner { @@ -198,15 +203,16 @@ where } } -/// How deep (in segments) should block be in order to be finalized. -/// -/// This is required for full nodes to not prune recent history such that keep-up sync in Substrate -/// works even without archival nodes (initial sync will be done from DSN). -/// -/// Ideally, we'd decouple pruning from finalization, but it may require invasive changes in -/// Substrate and is not worth it right now. -/// https://github.com/paritytech/substrate/discussions/14359 -pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: usize = 5; +/// Notification with block header hash that needs to be signed and sender for signature. +#[derive(Debug, Clone)] +pub struct ArchivedSegmentNotification { + /// Archived segment. + pub archived_segment: Arc, + /// Sender that signified the fact of receiving archived segment by farmer. + /// + /// This must be used to send a message or else block import pipeline will get stuck. + pub acknowledgement_sender: TracingUnboundedSender<()>, +} fn find_last_archived_block( client: &Client, @@ -423,9 +429,8 @@ where // Continuing from existing initial state let last_archived_block_number = last_segment_header.last_archived_block().number; info!( - target: "subspace", - "Last archived block {}", - last_archived_block_number, + %last_archived_block_number, + "Resuming archiver from last archived block", ); // Set initial value, this is needed in case only genesis block was archived and there @@ -464,7 +469,7 @@ where archiver } else { - info!(target: "subspace", "Starting archiving from genesis"); + info!("Starting archiving from genesis"); Archiver::new(subspace_link.kzg().clone()).expect("Incorrect parameters for archiver") }; @@ -497,10 +502,8 @@ where if let Some(blocks_to_archive_to) = blocks_to_archive_to { info!( - target: "subspace", "Archiving already produced blocks {}..={}", - blocks_to_archive_from, - blocks_to_archive_to, + blocks_to_archive_from, blocks_to_archive_to, ); let thread_pool = ThreadPoolBuilder::new() @@ -556,7 +559,6 @@ where let encoded_block = encode_block(block); debug!( - target: "subspace", "Encoded block {} has size of {:.2} kiB", block_number_to_archive, encoded_block.len() as f32 / 1024.0 @@ -621,11 +623,15 @@ fn finalize_block( client .apply_finality(import_op, hash, None, true) .map_err(|error| { - warn!(target: "subspace", "Error applying finality to block {:?}: {}", (hash, number), error); + warn!( + "Error applying finality to block {:?}: {}", + (hash, number), + error + ); error })?; - debug!(target: "subspace", "Finalizing blocks up to ({:?}, {})", number, hash); + debug!("Finalizing blocks up to ({:?}, {})", number, hash); telemetry!( telemetry; @@ -725,10 +731,8 @@ where let block_hash_to_archive = block.block.hash(); debug!( - target: "subspace", "Archiving block {:?} ({})", - block_number_to_archive, - block_hash_to_archive + block_number_to_archive, block_hash_to_archive ); if parent_block_hash != best_archived_block_hash { @@ -762,7 +766,6 @@ where let encoded_block = encode_block(block); debug!( - target: "subspace", "Encoded block {} has size of {:.2} kiB", block_number_to_archive, encoded_block.len() as f32 / 1024.0 @@ -796,7 +799,7 @@ where segment_headers .iter() .flat_map(|(_k, v)| v.iter().rev()) - .nth(FINALIZATION_DEPTH_IN_SEGMENTS) + .nth(FINALIZATION_DEPTH_IN_SEGMENTS.get()) .map(|segment_header| segment_header.last_archived_block().number) }; diff --git a/crates/sc-consensus-subspace/src/block_import.rs b/crates/sc-consensus-subspace/src/block_import.rs new file mode 100644 index 0000000000..b3c66d7d6a --- /dev/null +++ b/crates/sc-consensus-subspace/src/block_import.rs @@ -0,0 +1,725 @@ +// Copyright (C) 2021 Subspace Labs, Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Block import module. +//! +//! Contains implementation of block import with corresponding checks and notifications. + +use crate::archiver::SegmentHeadersStore; +use crate::verifier::VerificationError; +use crate::{aux_schema, slot_worker, SubspaceLink}; +use futures::channel::mpsc; +use futures::StreamExt; +use sc_client_api::backend::AuxStore; +use sc_client_api::BlockBackend; +use sc_consensus::block_import::{ + BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, +}; +use sc_proof_of_time::verifier::PotVerifier; +use sp_api::{ApiError, ApiExt, BlockT, HeaderT, ProvideRuntimeApi}; +use sp_block_builder::BlockBuilder as BlockBuilderApi; +use sp_blockchain::HeaderBackend; +use sp_consensus_slots::Slot; +use sp_consensus_subspace::digests::{ + extract_pre_digest, extract_subspace_digest_items, SubspaceDigestItems, +}; +use sp_consensus_subspace::{ + FarmerPublicKey, FarmerSignature, PotNextSlotInput, SubspaceApi, SubspaceJustification, +}; +use sp_inherents::{CreateInherentDataProviders, InherentDataProvider}; +use sp_runtime::traits::{NumberFor, One}; +use sp_runtime::Justifications; +use std::marker::PhantomData; +use std::sync::Arc; +use subspace_core_primitives::{ + BlockNumber, HistorySize, PublicKey, SectorId, SegmentHeader, SegmentIndex, SolutionRange, +}; +use subspace_proof_of_space::Table; +use subspace_verification::{calculate_block_weight, PieceCheckParams, VerifySolutionParams}; +use tracing::warn; + +/// Notification with number of the block that is about to be imported and acknowledgement sender +/// that can be used to pause block production if desired. +#[derive(Debug, Clone)] +pub struct BlockImportingNotification +where + Block: BlockT, +{ + /// Block number + pub block_number: NumberFor, + /// Sender for pausing the block import when operator is not fast enough to process + /// the consensus block. + pub acknowledgement_sender: mpsc::Sender<()>, +} +use subspace_verification::Error as VerificationPrimitiveError; + +/// Errors encountered by the Subspace authorship task. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Inner block import error + #[error("Inner block import error: {0}")] + InnerBlockImportError(#[from] sp_consensus::Error), + /// Error during digest item extraction + #[error("Digest item error: {0}")] + DigestItemError(#[from] sp_consensus_subspace::digests::Error), + /// Parent unavailable. Cannot import + #[error("Parent ({0}) of {1} unavailable. Cannot import")] + ParentUnavailable(Header::Hash, Header::Hash), + /// Genesis block unavailable. Cannot import + #[error("Genesis block unavailable. Cannot import")] + GenesisUnavailable, + /// Slot number must increase + #[error("Slot number must increase: parent slot: {0}, this slot: {1}")] + SlotMustIncrease(Slot, Slot), + /// Header has a bad seal + #[error("Header {0:?} has a bad seal")] + HeaderBadSeal(Header::Hash), + /// Header is unsealed + #[error("Header {0:?} is unsealed")] + HeaderUnsealed(Header::Hash), + /// Bad reward signature + #[error("Bad reward signature on {0:?}")] + BadRewardSignature(Header::Hash), + /// Missing Subspace justification + #[error("Missing Subspace justification")] + MissingSubspaceJustification, + /// Invalid Subspace justification + #[error("Invalid Subspace justification: {0}")] + InvalidSubspaceJustification(codec::Error), + /// Invalid Subspace justification contents + #[error("Invalid Subspace justification contents")] + InvalidSubspaceJustificationContents, + /// Invalid proof of time + #[error("Invalid proof of time")] + InvalidProofOfTime, + /// Solution is outside of solution range + #[error( + "Solution distance {solution_distance} is outside of solution range \ + {half_solution_range} (half of actual solution range) for slot {slot}" + )] + OutsideOfSolutionRange { + /// Time slot + slot: Slot, + /// Half of solution range + half_solution_range: SolutionRange, + /// Solution distance + solution_distance: SolutionRange, + }, + /// Invalid proof of space + #[error("Invalid proof of space")] + InvalidProofOfSpace, + /// Invalid audit chunk offset + #[error("Invalid audit chunk offset")] + InvalidAuditChunkOffset, + /// Invalid chunk witness + #[error("Invalid chunk witness")] + InvalidChunkWitness, + /// Piece verification failed + #[error("Piece verification failed")] + InvalidPieceOffset { + /// Time slot + slot: Slot, + /// Index of the piece that failed verification + piece_offset: u16, + /// How many pieces one sector is supposed to contain (max) + max_pieces_in_sector: u16, + }, + /// Piece verification failed + #[error("Piece verification failed for slot {0}")] + InvalidPiece(Slot), + /// Parent block has no associated weight + #[error("Parent block of {0} has no associated weight")] + ParentBlockNoAssociatedWeight(Header::Hash), + /// Block has invalid associated solution range + #[error("Invalid solution range for block {0}")] + InvalidSolutionRange(Header::Hash), + /// Invalid set of segment headers + #[error("Invalid set of segment headers")] + InvalidSetOfSegmentHeaders, + /// Stored segment header extrinsic was not found + #[error("Stored segment header extrinsic was not found: {0:?}")] + SegmentHeadersExtrinsicNotFound(Vec), + /// Segment header not found + #[error("Segment header for index {0} not found")] + SegmentHeaderNotFound(SegmentIndex), + /// Different segment commitment found + #[error( + "Different segment commitment for segment index {0} was found in storage, likely fork \ + below archiving point" + )] + DifferentSegmentCommitment(SegmentIndex), + /// Farmer in block list + #[error("Farmer {0} is in block list")] + FarmerInBlockList(FarmerPublicKey), + /// No block weight for parent header + #[error("No block weight for parent header {0}")] + NoBlockWeight(Header::Hash), + /// Segment commitment not found + #[error("Segment commitment for segment index {0} not found")] + SegmentCommitmentNotFound(SegmentIndex), + /// Sector expired + #[error("Sector expired")] + SectorExpired { + /// Expiration history size + expiration_history_size: HistorySize, + /// Current history size + current_history_size: HistorySize, + }, + /// Invalid history size + #[error("Invalid history size")] + InvalidHistorySize, + /// Only root plot public key is allowed + #[error("Only root plot public key is allowed")] + OnlyRootPlotPublicKeyAllowed, + /// Check inherents error + #[error("Checking inherents failed: {0}")] + CheckInherents(sp_inherents::Error), + /// Unhandled check inherents error + #[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))] + CheckInherentsUnhandled(sp_inherents::InherentIdentifier), + /// Create inherents error. + #[error("Creating inherents failed: {0}")] + CreateInherents(sp_inherents::Error), + /// Client error + #[error(transparent)] + Client(#[from] sp_blockchain::Error), + /// Runtime Api error. + #[error(transparent)] + RuntimeApi(#[from] ApiError), +} + +impl
From> for Error
+where + Header: HeaderT, +{ + #[inline] + fn from(error: VerificationError
) -> Self { + match error { + VerificationError::HeaderBadSeal(block_hash) => Error::HeaderBadSeal(block_hash), + VerificationError::HeaderUnsealed(block_hash) => Error::HeaderUnsealed(block_hash), + VerificationError::BadRewardSignature(block_hash) => { + Error::BadRewardSignature(block_hash) + } + VerificationError::MissingSubspaceJustification => Error::MissingSubspaceJustification, + VerificationError::InvalidSubspaceJustification(error) => { + Error::InvalidSubspaceJustification(error) + } + VerificationError::InvalidSubspaceJustificationContents => { + Error::InvalidSubspaceJustificationContents + } + VerificationError::InvalidProofOfTime => Error::InvalidProofOfTime, + VerificationError::VerificationError(slot, error) => match error { + VerificationPrimitiveError::InvalidPieceOffset { + piece_offset, + max_pieces_in_sector, + } => Error::InvalidPieceOffset { + slot, + piece_offset, + max_pieces_in_sector, + }, + VerificationPrimitiveError::InvalidPiece => Error::InvalidPiece(slot), + VerificationPrimitiveError::OutsideSolutionRange { + half_solution_range, + solution_distance, + } => Error::OutsideOfSolutionRange { + slot, + half_solution_range, + solution_distance, + }, + VerificationPrimitiveError::InvalidProofOfSpace => Error::InvalidProofOfSpace, + VerificationPrimitiveError::InvalidAuditChunkOffset => { + Error::InvalidAuditChunkOffset + } + VerificationPrimitiveError::InvalidChunkWitness => Error::InvalidChunkWitness, + VerificationPrimitiveError::SectorExpired { + expiration_history_size, + current_history_size, + } => Error::SectorExpired { + expiration_history_size, + current_history_size, + }, + VerificationPrimitiveError::InvalidHistorySize => Error::InvalidHistorySize, + }, + } + } +} + +impl
From> for String +where + Header: HeaderT, +{ + #[inline] + fn from(error: Error
) -> String { + error.to_string() + } +} + +/// A block-import handler for Subspace. +pub struct SubspaceBlockImport +where + Block: BlockT, +{ + inner: I, + client: Arc, + subspace_link: SubspaceLink, + create_inherent_data_providers: CIDP, + segment_headers_store: SegmentHeadersStore, + pot_verifier: PotVerifier, + _pos_table: PhantomData, +} + +impl Clone + for SubspaceBlockImport +where + Block: BlockT, + I: Clone, + CIDP: Clone, +{ + fn clone(&self) -> Self { + SubspaceBlockImport { + inner: self.inner.clone(), + client: self.client.clone(), + subspace_link: self.subspace_link.clone(), + create_inherent_data_providers: self.create_inherent_data_providers.clone(), + segment_headers_store: self.segment_headers_store.clone(), + pot_verifier: self.pot_verifier.clone(), + _pos_table: PhantomData, + } + } +} + +impl SubspaceBlockImport +where + PosTable: Table, + Block: BlockT, + Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, + Client::Api: BlockBuilderApi + SubspaceApi + ApiExt, + CIDP: CreateInherentDataProviders> + Send + Sync + 'static, + AS: AuxStore + Send + Sync + 'static, + BlockNumber: From<<::Header as HeaderT>::Number>, +{ + /// Produce a Subspace block-import object to be used later on in the construction of an import-queue. + pub fn new( + client: Arc, + block_import: I, + subspace_link: SubspaceLink, + create_inherent_data_providers: CIDP, + segment_headers_store: SegmentHeadersStore, + pot_verifier: PotVerifier, + ) -> Self { + Self { + client, + inner: block_import, + subspace_link, + create_inherent_data_providers, + segment_headers_store, + pot_verifier, + _pos_table: PhantomData, + } + } + + #[allow(clippy::too_many_arguments)] + async fn block_import_verification( + &self, + block_hash: Block::Hash, + header: Block::Header, + extrinsics: Option>, + root_plot_public_key: &Option, + subspace_digest_items: &SubspaceDigestItems< + FarmerPublicKey, + FarmerPublicKey, + FarmerSignature, + >, + justifications: &Option, + skip_runtime_access: bool, + ) -> Result<(), Error> { + let block_number = *header.number(); + let parent_hash = *header.parent_hash(); + + let pre_digest = &subspace_digest_items.pre_digest; + if let Some(root_plot_public_key) = root_plot_public_key { + if &pre_digest.solution().public_key != root_plot_public_key { + // Only root plot public key is allowed. + return Err(Error::OnlyRootPlotPublicKeyAllowed); + } + } + + // Check if farmer's plot is burned. + if self + .client + .runtime_api() + .is_in_block_list(parent_hash, &pre_digest.solution().public_key) + .or_else(|error| { + if skip_runtime_access { + Ok(false) + } else { + Err(Error::RuntimeApi(error)) + } + })? + { + warn!( + public_key = %pre_digest.solution().public_key, + "Ignoring block with solution provided by farmer in block list", + ); + + return Err(Error::FarmerInBlockList( + pre_digest.solution().public_key.clone(), + )); + } + + let parent_header = self + .client + .header(parent_hash)? + .ok_or(Error::ParentUnavailable(parent_hash, block_hash))?; + + let parent_slot = extract_pre_digest(&parent_header).map(|d| d.slot())?; + + // Make sure that slot number is strictly increasing + if pre_digest.slot() <= parent_slot { + return Err(Error::SlotMustIncrease(parent_slot, pre_digest.slot())); + } + + let parent_subspace_digest_items = if block_number.is_one() { + None + } else { + Some(extract_subspace_digest_items::< + _, + FarmerPublicKey, + FarmerPublicKey, + FarmerSignature, + >(&parent_header)?) + }; + + let correct_solution_range = if block_number.is_one() { + slot_worker::extract_solution_ranges_for_block(self.client.as_ref(), parent_hash)?.0 + } else { + let parent_subspace_digest_items = parent_subspace_digest_items + .as_ref() + .expect("Always Some for non-first block; qed"); + + match parent_subspace_digest_items.next_solution_range { + Some(solution_range) => solution_range, + None => parent_subspace_digest_items.solution_range, + } + }; + + if subspace_digest_items.solution_range != correct_solution_range { + return Err(Error::InvalidSolutionRange(block_hash)); + } + + let chain_constants = self.subspace_link.chain_constants(); + + // For PoT justifications we only need to check the seed and number of checkpoints, the rest + // was already checked during stateless block verification. + { + let Some(subspace_justification) = justifications + .as_ref() + .and_then(|justifications| { + justifications + .iter() + .find_map(SubspaceJustification::try_from_justification) + }) + .transpose() + .map_err(Error::InvalidSubspaceJustification)? + else { + return Err(Error::MissingSubspaceJustification); + }; + + let SubspaceJustification::PotCheckpoints { seed, checkpoints } = + subspace_justification; + + let future_slot = pre_digest.slot() + chain_constants.block_authoring_delay(); + + if block_number.is_one() { + // In case of first block seed must match genesis seed + if seed != self.pot_verifier.genesis_seed() { + return Err(Error::InvalidSubspaceJustificationContents); + } + + // Number of checkpoints must match future slot number + if checkpoints.len() as u64 != *future_slot { + return Err(Error::InvalidSubspaceJustificationContents); + } + } else { + let parent_subspace_digest_items = parent_subspace_digest_items + .as_ref() + .expect("Always Some for non-first block; qed"); + + let parent_future_slot = parent_slot + chain_constants.block_authoring_delay(); + + let correct_input_parameters = PotNextSlotInput::derive( + subspace_digest_items.pot_slot_iterations, + parent_future_slot, + parent_subspace_digest_items + .pre_digest + .pot_info() + .future_proof_of_time(), + &subspace_digest_items.pot_parameters_change, + ); + + if seed != correct_input_parameters.seed { + return Err(Error::InvalidSubspaceJustificationContents); + } + + // Number of checkpoints must match number of proofs that were not yet seen on chain + if checkpoints.len() as u64 != (*future_slot - *parent_future_slot) { + return Err(Error::InvalidSubspaceJustificationContents); + } + } + } + + let sector_id = SectorId::new( + PublicKey::from(&pre_digest.solution().public_key).hash(), + pre_digest.solution().sector_index, + ); + + // TODO: Below `skip_runtime_access` has no impact on this, but ideally it + // should (though we don't support fast sync yet, so doesn't matter in + // practice) + let max_pieces_in_sector = self + .client + .runtime_api() + .max_pieces_in_sector(parent_hash)?; + let piece_index = sector_id.derive_piece_index( + pre_digest.solution().piece_offset, + pre_digest.solution().history_size, + max_pieces_in_sector, + chain_constants.recent_segments(), + chain_constants.recent_history_fraction(), + ); + let segment_index = piece_index.segment_index(); + + let segment_commitment = self + .segment_headers_store + .get_segment_header(segment_index) + .map(|segment_header| segment_header.segment_commitment()) + .ok_or(Error::SegmentCommitmentNotFound(segment_index))?; + + let sector_expiration_check_segment_commitment = self + .segment_headers_store + .get_segment_header( + subspace_digest_items + .pre_digest + .solution() + .history_size + .sector_expiration_check(chain_constants.min_sector_lifetime()) + .ok_or(Error::InvalidHistorySize)? + .segment_index(), + ) + .map(|segment_header| segment_header.segment_commitment()); + + // Piece is not checked during initial block verification because it requires access to + // segment header and runtime, check it now. + subspace_verification::verify_solution::( + pre_digest.solution(), + // Slot was already checked during initial block verification + pre_digest.slot().into(), + &VerifySolutionParams { + proof_of_time: subspace_digest_items.pre_digest.pot_info().proof_of_time(), + solution_range: subspace_digest_items.solution_range, + piece_check_params: Some(PieceCheckParams { + max_pieces_in_sector, + segment_commitment, + recent_segments: chain_constants.recent_segments(), + recent_history_fraction: chain_constants.recent_history_fraction(), + min_sector_lifetime: chain_constants.min_sector_lifetime(), + // TODO: Below `skip_runtime_access` has no impact on this, but ideally it + // should (though we don't support fast sync yet, so doesn't matter in + // practice) + current_history_size: self.client.runtime_api().history_size(parent_hash)?, + sector_expiration_check_segment_commitment, + }), + }, + &self.subspace_link.kzg, + ) + .map_err(|error| VerificationError::VerificationError(pre_digest.slot(), error))?; + + if !skip_runtime_access { + // If the body is passed through, we need to use the runtime to check that the + // internally-set timestamp in the inherents actually matches the slot set in the seal + // and segment headers in the inherents are set correctly. + if let Some(extrinsics) = extrinsics { + let create_inherent_data_providers = self + .create_inherent_data_providers + .create_inherent_data_providers(parent_hash, self.subspace_link.clone()) + .await + .map_err(|error| Error::Client(sp_blockchain::Error::from(error)))?; + + let inherent_data = create_inherent_data_providers + .create_inherent_data() + .await + .map_err(Error::CreateInherents)?; + + let inherent_res = self.client.runtime_api().check_inherents( + parent_hash, + Block::new(header, extrinsics), + inherent_data, + )?; + + if !inherent_res.ok() { + for (i, e) in inherent_res.into_errors() { + match create_inherent_data_providers + .try_handle_error(&i, &e) + .await + { + Some(res) => res.map_err(Error::CheckInherents)?, + None => return Err(Error::CheckInherentsUnhandled(i)), + } + } + } + } + } + + Ok(()) + } +} + +#[async_trait::async_trait] +impl BlockImport + for SubspaceBlockImport +where + PosTable: Table, + Block: BlockT, + Inner: BlockImport + Send + Sync, + Client: ProvideRuntimeApi + + BlockBackend + + HeaderBackend + + AuxStore + + Send + + Sync, + Client::Api: BlockBuilderApi + SubspaceApi + ApiExt, + CIDP: CreateInherentDataProviders> + Send + Sync + 'static, + AS: AuxStore + Send + Sync + 'static, + BlockNumber: From<<::Header as HeaderT>::Number>, +{ + type Error = Error; + + async fn import_block( + &mut self, + mut block: BlockImportParams, + ) -> Result { + let block_hash = block.post_hash(); + let block_number = *block.header.number(); + + // Early exit if block already in chain + match self.client.status(block_hash)? { + sp_blockchain::BlockStatus::InChain => { + block.fork_choice = Some(ForkChoiceStrategy::Custom(false)); + return self + .inner + .import_block(block) + .await + .map_err(Error::InnerBlockImportError); + } + sp_blockchain::BlockStatus::Unknown => {} + } + + let subspace_digest_items = extract_subspace_digest_items(&block.header)?; + let skip_execution_checks = block.state_action.skip_execution_checks(); + + let root_plot_public_key = self + .client + .runtime_api() + .root_plot_public_key(*block.header.parent_hash())?; + + self.block_import_verification( + block_hash, + block.header.clone(), + block.body.clone(), + &root_plot_public_key, + &subspace_digest_items, + &block.justifications, + skip_execution_checks, + ) + .await?; + + let parent_weight = if block_number.is_one() { + 0 + } else { + aux_schema::load_block_weight(self.client.as_ref(), block.header.parent_hash())? + .ok_or_else(|| Error::ParentBlockNoAssociatedWeight(block_hash))? + }; + + let added_weight = calculate_block_weight(subspace_digest_items.solution_range); + let total_weight = parent_weight + added_weight; + + aux_schema::write_block_weight(block_hash, total_weight, |values| { + block + .auxiliary + .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec())))) + }); + + for (&segment_index, segment_commitment) in &subspace_digest_items.segment_commitments { + let found_segment_commitment = self + .segment_headers_store + .get_segment_header(segment_index) + .ok_or_else(|| Error::SegmentHeaderNotFound(segment_index))? + .segment_commitment(); + + if &found_segment_commitment != segment_commitment { + warn!( + "Different segment commitment for segment index {} was found in storage, \ + likely fork below archiving point. expected {:?}, found {:?}", + segment_index, segment_commitment, found_segment_commitment + ); + return Err(Error::DifferentSegmentCommitment(segment_index)); + } + } + + // The fork choice rule is that we pick the heaviest chain (i.e. smallest solution range), + // if there's a tie we go with the longest chain + let fork_choice = { + let info = self.client.info(); + + let last_best_weight = if &info.best_hash == block.header.parent_hash() { + // the parent=genesis case is already covered for loading parent weight, so we don't + // need to cover again here + parent_weight + } else { + aux_schema::load_block_weight(&*self.client, info.best_hash)? + .ok_or_else(|| Error::NoBlockWeight(info.best_hash))? + }; + + ForkChoiceStrategy::Custom(total_weight > last_best_weight) + }; + block.fork_choice = Some(fork_choice); + + let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0); + + self.subspace_link + .block_importing_notification_sender + .notify(move || BlockImportingNotification { + block_number, + acknowledgement_sender, + }); + + while acknowledgement_receiver.next().await.is_some() { + // Wait for all the acknowledgements to finish. + } + + self.inner + .import_block(block) + .await + .map_err(Error::InnerBlockImportError) + } + + async fn check_block( + &self, + block: BlockCheckParams, + ) -> Result { + self.inner.check_block(block).await.map_err(Into::into) + } +} diff --git a/crates/sc-consensus-subspace/src/lib.rs b/crates/sc-consensus-subspace/src/lib.rs index 51e8873f07..b66360076d 100644 --- a/crates/sc-consensus-subspace/src/lib.rs +++ b/crates/sc-consensus-subspace/src/lib.rs @@ -1,4 +1,3 @@ -// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd. // Copyright (C) 2021 Subspace Labs, Inc. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 @@ -16,518 +15,34 @@ // along with this program. If not, see . #![doc = include_str!("../README.md")] -#![feature(let_chains, try_blocks)] +#![feature(const_option, let_chains, try_blocks)] #![forbid(unsafe_code)] #![warn(missing_docs)] pub mod archiver; pub mod aux_schema; +pub mod block_import; pub mod notification; -mod slot_worker; +pub mod slot_worker; #[cfg(test)] mod tests; pub mod verifier; -use crate::archiver::{SegmentHeadersStore, FINALIZATION_DEPTH_IN_SEGMENTS}; +use crate::archiver::{ArchivedSegmentNotification, FINALIZATION_DEPTH_IN_SEGMENTS}; +use crate::block_import::BlockImportingNotification; use crate::notification::{SubspaceNotificationSender, SubspaceNotificationStream}; -use crate::slot_worker::SubspaceSlotWorker; -pub use crate::slot_worker::SubspaceSyncOracle; -use crate::verifier::VerificationError; -use futures::channel::mpsc; -use futures::StreamExt; -use log::{debug, info, warn}; +use crate::slot_worker::{NewSlotNotification, RewardSigningNotification}; use lru::LruCache; use parking_lot::Mutex; -use sc_client_api::backend::AuxStore; -use sc_client_api::{BlockBackend, BlockchainEvents, ProvideUncles, UsageProvider}; -use sc_consensus::block_import::{ - BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, -}; -use sc_consensus::{JustificationSyncLink, SharedBlockImport}; -use sc_consensus_slots::{BackoffAuthoringBlocksStrategy, InherentDataProviderExt, SlotProportion}; -use sc_proof_of_time::source::PotSlotInfoStream; -use sc_proof_of_time::verifier::PotVerifier; -use sc_telemetry::TelemetryHandle; -use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use sc_utils::mpsc::TracingUnboundedSender; -use sp_api::{ApiError, ApiExt, BlockT, HeaderT, NumberFor, ProvideRuntimeApi}; -use sp_block_builder::BlockBuilder as BlockBuilderApi; -use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult}; -use sp_consensus::{Environment, Error as ConsensusError, Proposer, SelectChain, SyncOracle}; -use sp_consensus_slots::{Slot, SlotDuration}; -use sp_consensus_subspace::digests::{ - extract_pre_digest, extract_subspace_digest_items, Error as DigestError, SubspaceDigestItems, -}; -use sp_consensus_subspace::{ - ChainConstants, FarmerPublicKey, FarmerSignature, PotNextSlotInput, SubspaceApi, - SubspaceJustification, -}; -use sp_core::H256; -use sp_inherents::{CreateInherentDataProviders, InherentDataProvider}; -use sp_runtime::traits::One; -use sp_runtime::Justifications; -use std::future::Future; -use std::marker::PhantomData; -use std::num::NonZeroUsize; -use std::pin::Pin; +use sp_api::{BlockT, NumberFor}; +use sp_consensus_subspace::ChainConstants; use std::sync::Arc; -use subspace_archiving::archiver::NewArchivedSegment; use subspace_core_primitives::crypto::kzg::Kzg; -use subspace_core_primitives::{ - BlockNumber, HistorySize, PublicKey, Randomness, SectorId, SegmentHeader, SegmentIndex, - Solution, SolutionRange, REWARD_SIGNING_CONTEXT, -}; -use subspace_proof_of_space::Table; -use subspace_verification::{ - calculate_block_weight, Error as VerificationPrimitiveError, PieceCheckParams, - VerifySolutionParams, -}; - -/// Information about new slot that just arrived -#[derive(Debug, Copy, Clone)] -pub struct NewSlotInfo { - /// Slot - pub slot: Slot, - /// Global randomness - pub global_randomness: Randomness, - /// Acceptable solution range for block authoring - pub solution_range: SolutionRange, - /// Acceptable solution range for voting - pub voting_solution_range: SolutionRange, -} - -/// New slot notification with slot information and sender for solution for the slot. -#[derive(Debug, Clone)] -pub struct NewSlotNotification { - /// New slot information. - pub new_slot_info: NewSlotInfo, - /// Sender that can be used to send solutions for the slot. - pub solution_sender: mpsc::Sender>, -} - -/// Notification with a hash that needs to be signed to receive reward and sender for signature. -#[derive(Debug, Clone)] -pub struct RewardSigningNotification { - /// Hash to be signed. - pub hash: H256, - /// Public key of the plot identity that should create signature. - pub public_key: FarmerPublicKey, - /// Sender that can be used to send signature for the header. - pub signature_sender: TracingUnboundedSender, -} - -/// Notification with block header hash that needs to be signed and sender for signature. -#[derive(Debug, Clone)] -pub struct ArchivedSegmentNotification { - /// Archived segment. - pub archived_segment: Arc, - /// Sender that signified the fact of receiving archived segment by farmer. - /// - /// This must be used to send a message or else block import pipeline will get stuck. - pub acknowledgement_sender: TracingUnboundedSender<()>, -} - -/// Notification with number of the block that is about to be imported and acknowledgement sender -/// that can be used to pause block production if desired. -/// -/// NOTE: Block is not fully imported yet! -#[derive(Debug, Clone)] -pub struct BlockImportingNotification -where - Block: BlockT, -{ - /// Block number - pub block_number: NumberFor, - /// Sender for pausing the block import when operator is not fast enough to process - /// the consensus block. - pub acknowledgement_sender: mpsc::Sender<()>, -} - -/// Errors encountered by the Subspace authorship task. -#[derive(Debug, thiserror::Error)] -pub enum Error { - /// Error during digest item extraction - #[error("Digest item error: {0}")] - DigestItemError(#[from] DigestError), - /// Parent unavailable. Cannot import - #[error("Parent ({0}) of {1} unavailable. Cannot import")] - ParentUnavailable(Header::Hash, Header::Hash), - /// Genesis block unavailable. Cannot import - #[error("Genesis block unavailable. Cannot import")] - GenesisUnavailable, - /// Slot number must increase - #[error("Slot number must increase: parent slot: {0}, this slot: {1}")] - SlotMustIncrease(Slot, Slot), - /// Header has a bad seal - #[error("Header {0:?} has a bad seal")] - HeaderBadSeal(Header::Hash), - /// Header is unsealed - #[error("Header {0:?} is unsealed")] - HeaderUnsealed(Header::Hash), - /// Bad reward signature - #[error("Bad reward signature on {0:?}")] - BadRewardSignature(Header::Hash), - /// Missing Subspace justification - #[error("Missing Subspace justification")] - MissingSubspaceJustification, - /// Invalid Subspace justification - #[error("Invalid Subspace justification: {0}")] - InvalidSubspaceJustification(codec::Error), - /// Invalid Subspace justification contents - #[error("Invalid Subspace justification contents")] - InvalidSubspaceJustificationContents, - /// Invalid proof of time - #[error("Invalid proof of time")] - InvalidProofOfTime, - /// Solution is outside of solution range - #[error( - "Solution distance {solution_distance} is outside of solution range \ - {half_solution_range} (half of actual solution range) for slot {slot}" - )] - OutsideOfSolutionRange { - /// Time slot - slot: Slot, - /// Half of solution range - half_solution_range: SolutionRange, - /// Solution distance - solution_distance: SolutionRange, - }, - /// Invalid proof of space - #[error("Invalid proof of space")] - InvalidProofOfSpace, - /// Invalid audit chunk offset - #[error("Invalid audit chunk offset")] - InvalidAuditChunkOffset, - /// Invalid chunk witness - #[error("Invalid chunk witness")] - InvalidChunkWitness, - /// Piece verification failed - #[error("Piece verification failed")] - InvalidPieceOffset { - /// Time slot - slot: Slot, - /// Index of the piece that failed verification - piece_offset: u16, - /// How many pieces one sector is supposed to contain (max) - max_pieces_in_sector: u16, - }, - /// Piece verification failed - #[error("Piece verification failed for slot {0}")] - InvalidPiece(Slot), - /// Parent block has no associated weight - #[error("Parent block of {0} has no associated weight")] - ParentBlockNoAssociatedWeight(Header::Hash), - /// Block has invalid associated solution range - #[error("Invalid solution range for block {0}")] - InvalidSolutionRange(Header::Hash), - /// Invalid set of segment headers - #[error("Invalid set of segment headers")] - InvalidSetOfSegmentHeaders, - /// Stored segment header extrinsic was not found - #[error("Stored segment header extrinsic was not found: {0:?}")] - SegmentHeadersExtrinsicNotFound(Vec), - /// Different segment commitment found - #[error( - "Different segment commitment for segment index {0} was found in storage, likely fork \ - below archiving point" - )] - DifferentSegmentCommitment(SegmentIndex), - /// Farmer in block list - #[error("Farmer {0} is in block list")] - FarmerInBlockList(FarmerPublicKey), - /// Segment commitment not found - #[error("Segment commitment for segment index {0} not found")] - SegmentCommitmentNotFound(SegmentIndex), - /// Sector expired - #[error("Sector expired")] - SectorExpired { - /// Expiration history size - expiration_history_size: HistorySize, - /// Current history size - current_history_size: HistorySize, - }, - /// Invalid history size - #[error("Invalid history size")] - InvalidHistorySize, - /// Only root plot public key is allowed - #[error("Only root plot public key is allowed")] - OnlyRootPlotPublicKeyAllowed, - /// Check inherents error - #[error("Checking inherents failed: {0}")] - CheckInherents(sp_inherents::Error), - /// Unhandled check inherents error - #[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))] - CheckInherentsUnhandled(sp_inherents::InherentIdentifier), - /// Create inherents error. - #[error("Creating inherents failed: {0}")] - CreateInherents(sp_inherents::Error), - /// Client error - #[error(transparent)] - Client(#[from] sp_blockchain::Error), - /// Runtime Api error. - #[error(transparent)] - RuntimeApi(#[from] ApiError), -} - -impl
From> for Error
-where - Header: HeaderT, -{ - #[inline] - fn from(error: VerificationError
) -> Self { - match error { - VerificationError::HeaderBadSeal(block_hash) => Error::HeaderBadSeal(block_hash), - VerificationError::HeaderUnsealed(block_hash) => Error::HeaderUnsealed(block_hash), - VerificationError::BadRewardSignature(block_hash) => { - Error::BadRewardSignature(block_hash) - } - VerificationError::MissingSubspaceJustification => Error::MissingSubspaceJustification, - VerificationError::InvalidSubspaceJustification(error) => { - Error::InvalidSubspaceJustification(error) - } - VerificationError::InvalidSubspaceJustificationContents => { - Error::InvalidSubspaceJustificationContents - } - VerificationError::InvalidProofOfTime => Error::InvalidProofOfTime, - VerificationError::VerificationError(slot, error) => match error { - VerificationPrimitiveError::InvalidPieceOffset { - piece_offset, - max_pieces_in_sector, - } => Error::InvalidPieceOffset { - slot, - piece_offset, - max_pieces_in_sector, - }, - VerificationPrimitiveError::InvalidPiece => Error::InvalidPiece(slot), - VerificationPrimitiveError::OutsideSolutionRange { - half_solution_range, - solution_distance, - } => Error::OutsideOfSolutionRange { - slot, - half_solution_range, - solution_distance, - }, - VerificationPrimitiveError::InvalidProofOfSpace => Error::InvalidProofOfSpace, - VerificationPrimitiveError::InvalidAuditChunkOffset => { - Error::InvalidAuditChunkOffset - } - VerificationPrimitiveError::InvalidChunkWitness => Error::InvalidChunkWitness, - VerificationPrimitiveError::SectorExpired { - expiration_history_size, - current_history_size, - } => Error::SectorExpired { - expiration_history_size, - current_history_size, - }, - VerificationPrimitiveError::InvalidHistorySize => Error::InvalidHistorySize, - }, - } - } -} - -impl
From> for String -where - Header: HeaderT, -{ - #[inline] - fn from(error: Error
) -> String { - error.to_string() - } -} - -/// Read configuration from the runtime state at current best block. -pub fn slot_duration(client: &Client) -> ClientResult -where - Block: BlockT, - Client: AuxStore + ProvideRuntimeApi + UsageProvider, - Client::Api: SubspaceApi, -{ - let block_hash = if client.usage_info().chain.finalized_state.is_some() { - client.usage_info().chain.best_hash - } else { - debug!(target: "subspace", "No finalized state is available. Reading config from genesis"); - client.usage_info().chain.genesis_hash - }; - - Ok(client.runtime_api().slot_duration(block_hash)?) -} - -/// Parameters for Subspace. -pub struct SubspaceParams -where - Block: BlockT, - SO: SyncOracle + Send + Sync, -{ - /// The client to use - pub client: Arc, - - /// The SelectChain Strategy - pub select_chain: SC, - - /// The environment we are producing blocks for. - pub env: E, - - /// The underlying block-import object to supply our produced blocks to. - /// This must be a `SubspaceBlockImport` or a wrapper of it, otherwise - /// critical consensus logic will be omitted. - pub block_import: SharedBlockImport, - - /// A sync oracle - pub sync_oracle: SubspaceSyncOracle, - - /// Hook into the sync module to control the justification sync process. - pub justification_sync_link: L, - - /// Something that can create the inherent data providers. - pub create_inherent_data_providers: CIDP, - - /// Force authoring of blocks even if we are offline - pub force_authoring: bool, - - /// Strategy and parameters for backing off block production. - pub backoff_authoring_blocks: Option, - - /// The source of timestamps for relative slots - pub subspace_link: SubspaceLink, - - /// Persistent storage of segment headers - pub segment_headers_store: SegmentHeadersStore, - - /// The proportion of the slot dedicated to proposing. - /// - /// The block proposing will be limited to this proportion of the slot from the starting of the - /// slot. However, the proposing can still take longer when there is some lenience factor applied, - /// because there were no blocks produced for some slots. - pub block_proposal_slot_portion: SlotProportion, - - /// The maximum proportion of the slot dedicated to proposing with any lenience factor applied - /// due to no blocks being produced. - pub max_block_proposal_slot_portion: Option, - - /// Handle use to report telemetries. - pub telemetry: Option, - - /// The offchain transaction pool factory. - /// - /// Will be used when sending equivocation reports and votes. - pub offchain_tx_pool_factory: OffchainTransactionPoolFactory, - - /// Proof of time verifier - pub pot_verifier: PotVerifier, - - /// Stream with proof of time slots. - pub pot_slot_info_stream: PotSlotInfoStream, -} - -/// Start the Subspace worker. -pub fn start_subspace( - SubspaceParams { - client, - select_chain, - env, - block_import, - sync_oracle, - justification_sync_link, - create_inherent_data_providers, - force_authoring, - backoff_authoring_blocks, - subspace_link, - segment_headers_store, - block_proposal_slot_portion, - max_block_proposal_slot_portion, - telemetry, - offchain_tx_pool_factory, - pot_verifier, - pot_slot_info_stream, - }: SubspaceParams, -) -> Result -where - PosTable: Table, - Block: BlockT, - Client: ProvideRuntimeApi - + ProvideUncles - + BlockchainEvents - + HeaderBackend - + HeaderMetadata - + AuxStore - + Send - + Sync - + 'static, - Client::Api: SubspaceApi, - SC: SelectChain + 'static, - E: Environment + Send + Sync + 'static, - E::Proposer: Proposer, - SO: SyncOracle + Send + Sync + Clone + 'static, - L: JustificationSyncLink + 'static, - CIDP: CreateInherentDataProviders + Send + Sync + 'static, - CIDP::InherentDataProviders: InherentDataProviderExt + Send, - BS: BackoffAuthoringBlocksStrategy> + Send + Sync + 'static, - AS: AuxStore + Send + Sync + 'static, - Error: std::error::Error + Send + From + 'static, - BlockNumber: From<<::Header as HeaderT>::Number>, -{ - let worker = SubspaceSlotWorker { - client: client.clone(), - block_import, - env, - sync_oracle: sync_oracle.clone(), - justification_sync_link, - force_authoring, - backoff_authoring_blocks, - subspace_link: subspace_link.clone(), - reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT), - block_proposal_slot_portion, - max_block_proposal_slot_portion, - telemetry, - offchain_tx_pool_factory, - chain_constants: client - .runtime_api() - .chain_constants(client.info().best_hash) - .map_err(|error| sp_consensus::Error::ChainLookup(error.to_string()))?, - segment_headers_store, - pending_solutions: Default::default(), - pot_checkpoints: Default::default(), - pot_verifier, - _pos_table: PhantomData::, - }; - - info!(target: "subspace", "🧑‍🌾 Starting Subspace Authorship worker"); - let inner = sc_proof_of_time::start_slot_worker( - subspace_link.slot_duration(), - client, - select_chain, - worker, - sync_oracle, - create_inherent_data_providers, - pot_slot_info_stream, - ); - - Ok(SubspaceWorker { - inner: Box::pin(inner), - }) -} - -/// Worker for Subspace which implements `Future`. This must be polled. -#[must_use] -pub struct SubspaceWorker { - inner: Pin + Send + 'static>>, -} - -impl Future for SubspaceWorker { - type Output = (); - - fn poll( - mut self: Pin<&mut Self>, - cx: &mut futures::task::Context, - ) -> futures::task::Poll { - self.inner.as_mut().poll(cx) - } -} +use subspace_core_primitives::SegmentHeader; /// State that must be shared between the import queue and the authoring logic. #[derive(Clone)] pub struct SubspaceLink { - slot_duration: SlotDuration, new_slot_notification_sender: SubspaceNotificationSender, new_slot_notification_stream: SubspaceNotificationStream, reward_signing_notification_sender: SubspaceNotificationSender, @@ -541,13 +56,37 @@ pub struct SubspaceLink { /// Segment headers that are expected to appear in the corresponding blocks, used for block /// production and validation segment_headers: Arc, Vec>>>, + chain_constants: ChainConstants, kzg: Kzg, } impl SubspaceLink { - /// Get the slot duration from this link. - pub fn slot_duration(&self) -> SlotDuration { - self.slot_duration + /// Create new instance. + pub fn new(chain_constants: ChainConstants, kzg: Kzg) -> Self { + let (new_slot_notification_sender, new_slot_notification_stream) = + notification::channel("subspace_new_slot_notification_stream"); + let (reward_signing_notification_sender, reward_signing_notification_stream) = + notification::channel("subspace_reward_signing_notification_stream"); + let (archived_segment_notification_sender, archived_segment_notification_stream) = + notification::channel("subspace_archived_segment_notification_stream"); + let (block_importing_notification_sender, block_importing_notification_stream) = + notification::channel("subspace_block_importing_notification_stream"); + + Self { + new_slot_notification_sender, + new_slot_notification_stream, + reward_signing_notification_sender, + reward_signing_notification_stream, + archived_segment_notification_sender, + archived_segment_notification_stream, + block_importing_notification_sender, + block_importing_notification_stream, + segment_headers: Arc::new(Mutex::new(LruCache::new( + FINALIZATION_DEPTH_IN_SEGMENTS.saturating_add(1), + ))), + chain_constants, + kzg, + } } /// Get stream with notifications about new slot arrival with ability to send solution back. @@ -590,562 +129,13 @@ impl SubspaceLink { .unwrap_or_default() } + /// Subspace chain constants. + pub fn chain_constants(&self) -> &ChainConstants { + &self.chain_constants + } + /// Access KZG instance pub fn kzg(&self) -> &Kzg { &self.kzg } } - -/// A block-import handler for Subspace. -pub struct SubspaceBlockImport -where - Block: BlockT, -{ - inner: I, - client: Arc, - subspace_link: SubspaceLink, - create_inherent_data_providers: CIDP, - chain_constants: ChainConstants, - segment_headers_store: SegmentHeadersStore, - pot_verifier: PotVerifier, - _pos_table: PhantomData, -} - -impl Clone - for SubspaceBlockImport -where - Block: BlockT, - I: Clone, - CIDP: Clone, -{ - fn clone(&self) -> Self { - SubspaceBlockImport { - inner: self.inner.clone(), - client: self.client.clone(), - subspace_link: self.subspace_link.clone(), - create_inherent_data_providers: self.create_inherent_data_providers.clone(), - chain_constants: self.chain_constants, - segment_headers_store: self.segment_headers_store.clone(), - pot_verifier: self.pot_verifier.clone(), - _pos_table: PhantomData, - } - } -} - -impl SubspaceBlockImport -where - PosTable: Table, - Block: BlockT, - Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, - Client::Api: BlockBuilderApi + SubspaceApi + ApiExt, - CIDP: CreateInherentDataProviders> + Send + Sync + 'static, - AS: AuxStore + Send + Sync + 'static, - BlockNumber: From<<::Header as HeaderT>::Number>, -{ - fn new( - client: Arc, - block_import: I, - subspace_link: SubspaceLink, - create_inherent_data_providers: CIDP, - chain_constants: ChainConstants, - segment_headers_store: SegmentHeadersStore, - pot_verifier: PotVerifier, - ) -> Self { - Self { - client, - inner: block_import, - subspace_link, - create_inherent_data_providers, - chain_constants, - segment_headers_store, - pot_verifier, - _pos_table: PhantomData, - } - } - - #[allow(clippy::too_many_arguments)] - async fn block_import_verification( - &self, - block_hash: Block::Hash, - header: Block::Header, - extrinsics: Option>, - root_plot_public_key: &Option, - subspace_digest_items: &SubspaceDigestItems< - FarmerPublicKey, - FarmerPublicKey, - FarmerSignature, - >, - justifications: &Option, - skip_runtime_access: bool, - ) -> Result<(), Error> { - let block_number = *header.number(); - let parent_hash = *header.parent_hash(); - - let pre_digest = &subspace_digest_items.pre_digest; - if let Some(root_plot_public_key) = root_plot_public_key { - if &pre_digest.solution().public_key != root_plot_public_key { - // Only root plot public key is allowed. - return Err(Error::OnlyRootPlotPublicKeyAllowed); - } - } - - // Check if farmer's plot is burned. - if self - .client - .runtime_api() - .is_in_block_list(parent_hash, &pre_digest.solution().public_key) - .or_else(|error| { - if skip_runtime_access { - Ok(false) - } else { - Err(Error::::RuntimeApi(error)) - } - })? - { - warn!( - target: "subspace", - "Ignoring block with solution provided by farmer in block list: {}", - pre_digest.solution().public_key - ); - - return Err(Error::FarmerInBlockList( - pre_digest.solution().public_key.clone(), - )); - } - - let parent_header = self - .client - .header(parent_hash)? - .ok_or(Error::ParentUnavailable(parent_hash, block_hash))?; - - let parent_slot = extract_pre_digest(&parent_header).map(|d| d.slot())?; - - // Make sure that slot number is strictly increasing - if pre_digest.slot() <= parent_slot { - return Err(Error::SlotMustIncrease(parent_slot, pre_digest.slot())); - } - - let parent_subspace_digest_items = if block_number.is_one() { - None - } else { - Some(extract_subspace_digest_items::< - _, - FarmerPublicKey, - FarmerPublicKey, - FarmerSignature, - >(&parent_header)?) - }; - - let correct_solution_range = if block_number.is_one() { - slot_worker::extract_solution_ranges_for_block(self.client.as_ref(), parent_hash)?.0 - } else { - let parent_subspace_digest_items = parent_subspace_digest_items - .as_ref() - .expect("Always Some for non-first block; qed"); - - match parent_subspace_digest_items.next_solution_range { - Some(solution_range) => solution_range, - None => parent_subspace_digest_items.solution_range, - } - }; - - if subspace_digest_items.solution_range != correct_solution_range { - return Err(Error::InvalidSolutionRange(block_hash)); - } - - // For PoT justifications we only need to check the seed and number of checkpoints, the rest - // was already checked during stateless block verification. - { - let Some(subspace_justification) = justifications - .as_ref() - .and_then(|justifications| { - justifications - .iter() - .find_map(SubspaceJustification::try_from_justification) - }) - .transpose() - .map_err(Error::InvalidSubspaceJustification)? - else { - return Err(Error::MissingSubspaceJustification); - }; - - let SubspaceJustification::PotCheckpoints { seed, checkpoints } = - subspace_justification; - - let future_slot = pre_digest.slot() + self.chain_constants.block_authoring_delay(); - - if block_number.is_one() { - // In case of first block seed must match genesis seed - if seed != self.pot_verifier.genesis_seed() { - return Err(Error::InvalidSubspaceJustificationContents); - } - - // Number of checkpoints must match future slot number - if checkpoints.len() as u64 != *future_slot { - return Err(Error::InvalidSubspaceJustificationContents); - } - } else { - let parent_subspace_digest_items = parent_subspace_digest_items - .as_ref() - .expect("Always Some for non-first block; qed"); - - let parent_future_slot = parent_slot + self.chain_constants.block_authoring_delay(); - - let correct_input_parameters = PotNextSlotInput::derive( - subspace_digest_items.pot_slot_iterations, - parent_future_slot, - parent_subspace_digest_items - .pre_digest - .pot_info() - .future_proof_of_time(), - &subspace_digest_items.pot_parameters_change, - ); - - if seed != correct_input_parameters.seed { - return Err(Error::InvalidSubspaceJustificationContents); - } - - // Number of checkpoints must match number of proofs that were not yet seen on chain - if checkpoints.len() as u64 != (*future_slot - *parent_future_slot) { - return Err(Error::InvalidSubspaceJustificationContents); - } - } - } - - let sector_id = SectorId::new( - PublicKey::from(&pre_digest.solution().public_key).hash(), - pre_digest.solution().sector_index, - ); - - // TODO: Below `skip_runtime_access` has no impact on this, but ideally it - // should (though we don't support fast sync yet, so doesn't matter in - // practice) - let max_pieces_in_sector = self - .client - .runtime_api() - .max_pieces_in_sector(parent_hash)?; - let piece_index = sector_id.derive_piece_index( - pre_digest.solution().piece_offset, - pre_digest.solution().history_size, - max_pieces_in_sector, - self.chain_constants.recent_segments(), - self.chain_constants.recent_history_fraction(), - ); - let segment_index = piece_index.segment_index(); - - let segment_commitment = self - .segment_headers_store - .get_segment_header(segment_index) - .map(|segment_header| segment_header.segment_commitment()) - .ok_or(Error::SegmentCommitmentNotFound(segment_index))?; - - let sector_expiration_check_segment_commitment = self - .segment_headers_store - .get_segment_header( - subspace_digest_items - .pre_digest - .solution() - .history_size - .sector_expiration_check(self.chain_constants.min_sector_lifetime()) - .ok_or(Error::InvalidHistorySize)? - .segment_index(), - ) - .map(|segment_header| segment_header.segment_commitment()); - - // Piece is not checked during initial block verification because it requires access to - // segment header and runtime, check it now. - subspace_verification::verify_solution::( - pre_digest.solution(), - // Slot was already checked during initial block verification - pre_digest.slot().into(), - &VerifySolutionParams { - proof_of_time: subspace_digest_items.pre_digest.pot_info().proof_of_time(), - solution_range: subspace_digest_items.solution_range, - piece_check_params: Some(PieceCheckParams { - max_pieces_in_sector, - segment_commitment, - recent_segments: self.chain_constants.recent_segments(), - recent_history_fraction: self.chain_constants.recent_history_fraction(), - min_sector_lifetime: self.chain_constants.min_sector_lifetime(), - // TODO: Below `skip_runtime_access` has no impact on this, but ideally it - // should (though we don't support fast sync yet, so doesn't matter in - // practice) - current_history_size: self.client.runtime_api().history_size(parent_hash)?, - sector_expiration_check_segment_commitment, - }), - }, - &self.subspace_link.kzg, - ) - .map_err(|error| VerificationError::VerificationError(pre_digest.slot(), error))?; - - if !skip_runtime_access { - // If the body is passed through, we need to use the runtime to check that the - // internally-set timestamp in the inherents actually matches the slot set in the seal - // and segment headers in the inherents are set correctly. - if let Some(extrinsics) = extrinsics { - let create_inherent_data_providers = self - .create_inherent_data_providers - .create_inherent_data_providers(parent_hash, self.subspace_link.clone()) - .await - .map_err(|error| Error::Client(sp_blockchain::Error::from(error)))?; - - let inherent_data = create_inherent_data_providers - .create_inherent_data() - .await - .map_err(Error::CreateInherents)?; - - let inherent_res = self.client.runtime_api().check_inherents( - parent_hash, - Block::new(header, extrinsics), - inherent_data, - )?; - - if !inherent_res.ok() { - for (i, e) in inherent_res.into_errors() { - match create_inherent_data_providers - .try_handle_error(&i, &e) - .await - { - Some(res) => res.map_err(Error::CheckInherents)?, - None => return Err(Error::CheckInherentsUnhandled(i)), - } - } - } - } - } - - Ok(()) - } -} - -#[async_trait::async_trait] -impl BlockImport - for SubspaceBlockImport -where - PosTable: Table, - Block: BlockT, - Inner: BlockImport + Send + Sync, - Inner::Error: Into, - Client: ProvideRuntimeApi - + BlockBackend - + HeaderBackend - + AuxStore - + Send - + Sync, - Client::Api: BlockBuilderApi + SubspaceApi + ApiExt, - CIDP: CreateInherentDataProviders> + Send + Sync + 'static, - AS: AuxStore + Send + Sync + 'static, - BlockNumber: From<<::Header as HeaderT>::Number>, -{ - type Error = ConsensusError; - - async fn import_block( - &mut self, - mut block: BlockImportParams, - ) -> Result { - let block_hash = block.post_hash(); - let block_number = *block.header.number(); - - // Early exit if block already in chain - match self.client.status(block_hash) { - Ok(sp_blockchain::BlockStatus::InChain) => { - block.fork_choice = Some(ForkChoiceStrategy::Custom(false)); - return self.inner.import_block(block).await.map_err(Into::into); - } - Ok(sp_blockchain::BlockStatus::Unknown) => {} - Err(error) => return Err(ConsensusError::ClientImport(error.to_string())), - } - - let subspace_digest_items = extract_subspace_digest_items(&block.header) - .map_err(|error| ConsensusError::ClientImport(error.to_string()))?; - let skip_execution_checks = block.state_action.skip_execution_checks(); - - let root_plot_public_key = self - .client - .runtime_api() - .root_plot_public_key(*block.header.parent_hash()) - .map_err(Error::::RuntimeApi) - .map_err(|e| ConsensusError::ClientImport(e.to_string()))?; - - self.block_import_verification( - block_hash, - block.header.clone(), - block.body.clone(), - &root_plot_public_key, - &subspace_digest_items, - &block.justifications, - skip_execution_checks, - ) - .await - .map_err(|error| ConsensusError::ClientImport(error.to_string()))?; - - let parent_weight = if block_number.is_one() { - 0 - } else { - aux_schema::load_block_weight(self.client.as_ref(), block.header.parent_hash()) - .map_err(|e| ConsensusError::ClientImport(e.to_string()))? - .ok_or_else(|| { - ConsensusError::ClientImport( - Error::::ParentBlockNoAssociatedWeight(block_hash) - .to_string(), - ) - })? - }; - - let added_weight = calculate_block_weight(subspace_digest_items.solution_range); - let total_weight = parent_weight + added_weight; - - aux_schema::write_block_weight(block_hash, total_weight, |values| { - block - .auxiliary - .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec())))) - }); - - for (&segment_index, segment_commitment) in &subspace_digest_items.segment_commitments { - let found_segment_commitment = self - .segment_headers_store - .get_segment_header(segment_index) - .ok_or_else(|| { - ConsensusError::ClientImport(format!( - "Segment header for index {segment_index} not found" - )) - })? - .segment_commitment(); - - if &found_segment_commitment != segment_commitment { - warn!( - target: "subspace", - "Different segment commitment for segment index {} was found in storage, \ - likely fork below archiving point. expected {:?}, found {:?}", - segment_index, - segment_commitment, - found_segment_commitment - ); - return Err(ConsensusError::ClientImport( - Error::::DifferentSegmentCommitment(segment_index).to_string(), - )); - } - } - - // The fork choice rule is that we pick the heaviest chain (i.e. smallest solution range), - // if there's a tie we go with the longest chain - let fork_choice = { - let info = self.client.info(); - - let last_best_weight = if &info.best_hash == block.header.parent_hash() { - // the parent=genesis case is already covered for loading parent weight, so we don't - // need to cover again here - parent_weight - } else { - aux_schema::load_block_weight(&*self.client, info.best_hash) - .map_err(|e| ConsensusError::ChainLookup(e.to_string()))? - .ok_or_else(|| { - ConsensusError::ChainLookup( - "No block weight for parent header.".to_string(), - ) - })? - }; - - ForkChoiceStrategy::Custom(total_weight > last_best_weight) - }; - block.fork_choice = Some(fork_choice); - - let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0); - - self.subspace_link - .block_importing_notification_sender - .notify(move || BlockImportingNotification { - block_number, - acknowledgement_sender, - }); - - while acknowledgement_receiver.next().await.is_some() { - // Wait for all the acknowledgements to finish. - } - - self.inner.import_block(block).await - } - - async fn check_block( - &self, - block: BlockCheckParams, - ) -> Result { - self.inner.check_block(block).await.map_err(Into::into) - } -} - -/// Produce a Subspace block-import object to be used later on in the construction of an -/// import-queue. -/// -/// Also returns a link object used to correctly instantiate the import queue and background worker. -#[allow(clippy::type_complexity)] -pub fn block_import( - slot_duration: SlotDuration, - block_import_inner: I, - client: Arc, - kzg: Kzg, - create_inherent_data_providers: CIDP, - segment_headers_store: SegmentHeadersStore, - pot_verifier: PotVerifier, -) -> Result< - ( - SubspaceBlockImport, - SubspaceLink, - ), - sp_blockchain::Error, -> -where - PosTable: Table, - Block: BlockT, - Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, - Client::Api: BlockBuilderApi + SubspaceApi, - CIDP: CreateInherentDataProviders> + Send + Sync + 'static, - AS: AuxStore + Send + Sync + 'static, - BlockNumber: From<<::Header as HeaderT>::Number>, -{ - let (new_slot_notification_sender, new_slot_notification_stream) = - notification::channel("subspace_new_slot_notification_stream"); - let (reward_signing_notification_sender, reward_signing_notification_stream) = - notification::channel("subspace_reward_signing_notification_stream"); - let (archived_segment_notification_sender, archived_segment_notification_stream) = - notification::channel("subspace_archived_segment_notification_stream"); - let (block_importing_notification_sender, block_importing_notification_stream) = - notification::channel("subspace_block_importing_notification_stream"); - - let chain_constants = client - .runtime_api() - .chain_constants(client.info().best_hash)?; - - let link = SubspaceLink { - slot_duration, - new_slot_notification_sender, - new_slot_notification_stream, - reward_signing_notification_sender, - reward_signing_notification_stream, - archived_segment_notification_sender, - archived_segment_notification_stream, - block_importing_notification_sender, - block_importing_notification_stream, - // TODO: Consider making `confirmation_depth_k` non-zero - segment_headers: Arc::new(Mutex::new(LruCache::new( - NonZeroUsize::new( - (FINALIZATION_DEPTH_IN_SEGMENTS + 1) - .max(chain_constants.confirmation_depth_k() as usize), - ) - .expect("Confirmation depth of zero is not supported"), - ))), - kzg, - }; - - let import = SubspaceBlockImport::new( - client, - block_import_inner, - link.clone(), - create_inherent_data_providers, - chain_constants, - segment_headers_store, - pot_verifier, - ); - - Ok((import, link)) -} diff --git a/crates/sc-consensus-subspace/src/slot_worker.rs b/crates/sc-consensus-subspace/src/slot_worker.rs index f4c134880e..aabc1cb7e4 100644 --- a/crates/sc-consensus-subspace/src/slot_worker.rs +++ b/crates/sc-consensus-subspace/src/slot_worker.rs @@ -1,4 +1,3 @@ -// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd. // Copyright (C) 2021 Subspace Labs, Inc. // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 @@ -15,11 +14,14 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! Slot worker module. +//! +//! Contains implementation of Subspace slot worker that produces block and votes. + use crate::archiver::SegmentHeadersStore; -use crate::{NewSlotInfo, NewSlotNotification, RewardSigningNotification, SubspaceLink}; +use crate::SubspaceLink; use futures::channel::mpsc; use futures::{StreamExt, TryFutureExt}; -use log::{debug, error, info, warn}; use sc_client_api::AuxStore; use sc_consensus::block_import::{BlockImportParams, StateAction}; use sc_consensus::{JustificationSyncLink, SharedBlockImport, StorageChanges}; @@ -30,7 +32,7 @@ use sc_proof_of_time::verifier::PotVerifier; use sc_proof_of_time::PotSlotWorker; use sc_telemetry::TelemetryHandle; use sc_transaction_pool_api::OffchainTransactionPoolFactory; -use sc_utils::mpsc::tracing_unbounded; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; use schnorrkel::context::SigningContext; use sp_api::{ApiError, ApiExt, NumberFor, ProvideRuntimeApi}; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata}; @@ -40,7 +42,7 @@ use sp_consensus_subspace::digests::{ extract_pre_digest, CompatibleDigestItem, PreDigest, PreDigestPotInfo, }; use sp_consensus_subspace::{ - ChainConstants, FarmerPublicKey, FarmerSignature, PotNextSlotInput, SignedVote, SubspaceApi, + FarmerPublicKey, FarmerSignature, PotNextSlotInput, SignedVote, SubspaceApi, SubspaceJustification, Vote, }; use sp_core::crypto::ByteArray; @@ -53,12 +55,14 @@ use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use subspace_core_primitives::{ - BlockNumber, PotCheckpoints, PotOutput, PublicKey, RewardSignature, SectorId, Solution, + BlockNumber, PotCheckpoints, PotOutput, PublicKey, Randomness, RewardSignature, SectorId, + Solution, SolutionRange, REWARD_SIGNING_CONTEXT, }; use subspace_proof_of_space::Table; use subspace_verification::{ check_reward_signature, verify_solution, PieceCheckParams, VerifySolutionParams, }; +use tracing::{debug, error, info, warn}; /// Large enough size for any practical purposes, there shouldn't be even this many solutions. const PENDING_SOLUTIONS_CHANNEL_CAPACITY: usize = 10; @@ -104,33 +108,110 @@ where } } -pub(super) struct SubspaceSlotWorker +/// Information about new slot that just arrived +#[derive(Debug, Copy, Clone)] +pub struct NewSlotInfo { + /// Slot + pub slot: Slot, + /// Global randomness + pub global_randomness: Randomness, + /// Acceptable solution range for block authoring + pub solution_range: SolutionRange, + /// Acceptable solution range for voting + pub voting_solution_range: SolutionRange, +} + +/// New slot notification with slot information and sender for solution for the slot. +#[derive(Debug, Clone)] +pub struct NewSlotNotification { + /// New slot information. + pub new_slot_info: NewSlotInfo, + /// Sender that can be used to send solutions for the slot. + pub solution_sender: mpsc::Sender>, +} +/// Notification with a hash that needs to be signed to receive reward and sender for signature. +#[derive(Debug, Clone)] +pub struct RewardSigningNotification { + /// Hash to be signed. + pub hash: H256, + /// Public key of the plot identity that should create signature. + pub public_key: FarmerPublicKey, + /// Sender that can be used to send signature for the header. + pub signature_sender: TracingUnboundedSender, +} + +/// Parameters for [`SubspaceSlotWorker`] +pub struct SubspaceSlotWorkerOptions +where + Block: BlockT, + SO: SyncOracle + Send + Sync, +{ + /// The client to use + pub client: Arc, + /// The environment we are producing blocks for. + pub env: E, + /// The underlying block-import object to supply our produced blocks to. + /// This must be a `SubspaceBlockImport` or a wrapper of it, otherwise + /// critical consensus logic will be omitted. + pub block_import: SharedBlockImport, + /// A sync oracle + pub sync_oracle: SubspaceSyncOracle, + /// Hook into the sync module to control the justification sync process. + pub justification_sync_link: L, + /// Force authoring of blocks even if we are offline + pub force_authoring: bool, + /// Strategy and parameters for backing off block production. + pub backoff_authoring_blocks: Option, + /// The source of timestamps for relative slots + pub subspace_link: SubspaceLink, + /// Persistent storage of segment headers + pub segment_headers_store: SegmentHeadersStore, + /// The proportion of the slot dedicated to proposing. + /// + /// The block proposing will be limited to this proportion of the slot from the starting of the + /// slot. However, the proposing can still take longer when there is some lenience factor applied, + /// because there were no blocks produced for some slots. + pub block_proposal_slot_portion: SlotProportion, + /// The maximum proportion of the slot dedicated to proposing with any lenience factor applied + /// due to no blocks being produced. + pub max_block_proposal_slot_portion: Option, + /// Handle use to report telemetries. + pub telemetry: Option, + /// The offchain transaction pool factory. + /// + /// Will be used when sending equivocation reports and votes. + pub offchain_tx_pool_factory: OffchainTransactionPoolFactory, + /// Proof of time verifier + pub pot_verifier: PotVerifier, +} + +/// Subspace slot worker responsible for block and vote production +pub struct SubspaceSlotWorker where Block: BlockT, + SO: SyncOracle + Send + Sync, { - pub(super) client: Arc, - pub(super) block_import: SharedBlockImport, - pub(super) env: E, - pub(super) sync_oracle: SO, - pub(super) justification_sync_link: L, - pub(super) force_authoring: bool, - pub(super) backoff_authoring_blocks: Option, - pub(super) subspace_link: SubspaceLink, - pub(super) reward_signing_context: SigningContext, - pub(super) block_proposal_slot_portion: SlotProportion, - pub(super) max_block_proposal_slot_portion: Option, - pub(super) telemetry: Option, - pub(crate) offchain_tx_pool_factory: OffchainTransactionPoolFactory, - pub(super) chain_constants: ChainConstants, - pub(super) segment_headers_store: SegmentHeadersStore, + client: Arc, + block_import: SharedBlockImport, + env: E, + sync_oracle: SubspaceSyncOracle, + justification_sync_link: L, + force_authoring: bool, + backoff_authoring_blocks: Option, + subspace_link: SubspaceLink, + reward_signing_context: SigningContext, + block_proposal_slot_portion: SlotProportion, + max_block_proposal_slot_portion: Option, + telemetry: Option, + offchain_tx_pool_factory: OffchainTransactionPoolFactory, + segment_headers_store: SegmentHeadersStore, /// Solution receivers for challenges that were sent to farmers and expected to be received /// eventually - pub(super) pending_solutions: - BTreeMap>>, + pending_solutions: BTreeMap>>, /// Collection of PoT slots that can be retrieved later if needed by block production - pub(super) pot_checkpoints: BTreeMap, - pub(super) pot_verifier: PotVerifier, - pub(super) _pos_table: PhantomData, + pot_checkpoints: BTreeMap, + pot_verifier: PotVerifier, + _pos_table: PhantomData, } impl PotSlotWorker @@ -149,10 +230,7 @@ where self.pot_checkpoints.insert(slot, checkpoints); if self.sync_oracle.is_major_syncing() { - debug!( - target: "subspace", - "Skipping farming slot {slot} due to sync" - ); + debug!("Skipping farming slot {slot} due to sync"); return; } @@ -167,8 +245,10 @@ where Ok(solution_ranges) => solution_ranges, Err(error) => { warn!( - target: "subspace", - "Failed to extract solution ranges for block at slot {slot}: {error}" + %slot, + %best_hash, + %error, + "Failed to extract solution ranges for block" ); return; } @@ -216,7 +296,7 @@ where BlockNumber: From<<::Header as Header>::Number>, { type BlockImport = SharedBlockImport; - type SyncOracle = SO; + type SyncOracle = SubspaceSyncOracle; type JustificationSyncLink = L; type CreateProposer = Pin> + Send + 'static>>; @@ -260,8 +340,8 @@ where Ok(pre_digest) => pre_digest, Err(error) => { error!( - target: "subspace", - "Failed to parse pre-digest out of parent header: {error}" + %error, + "Failed to parse pre-digest out of parent header" ); return None; @@ -271,15 +351,16 @@ where if slot <= parent_slot { debug!( - target: "subspace", "Skipping claiming slot {slot} it must be higher than parent slot {parent_slot}", ); return None; } else { - debug!(target: "subspace", "Attempting to claim slot {}", slot); + debug!(%slot, "Attempting to claim slot"); } + let chain_constants = self.subspace_link.chain_constants(); + let parent_hash = parent_header.hash(); let runtime_api = self.client.runtime_api(); @@ -292,7 +373,7 @@ where let parent_future_slot = if parent_header.number().is_zero() { parent_slot } else { - parent_slot + self.chain_constants.block_authoring_delay() + parent_slot + chain_constants.block_authoring_delay() }; let (proof_of_time, future_proof_of_time, pot_justification) = { @@ -303,7 +384,7 @@ where let proof_of_time = self.pot_checkpoints.get(&slot)?.output(); // Future slot for which proof must be available before authoring block at this slot - let future_slot = slot + self.chain_constants.block_authoring_delay(); + let future_slot = slot + chain_constants.block_authoring_delay(); let pot_input = if parent_header.number().is_zero() { PotNextSlotInput { @@ -328,8 +409,8 @@ where parent_pot_parameters.next_parameters_change(), ) { warn!( - target: "subspace", - "Proof of time is invalid, skipping block authoring at slot {slot:?}" + %slot, + "Proof of time is invalid, skipping block authoring at slot" ); return None; } @@ -414,10 +495,9 @@ where .ok()? { warn!( - target: "subspace", - "Ignoring solution for slot {} provided by farmer in block list: {}", - slot, - solution.public_key, + %slot, + public_key = %solution.public_key, + "Ignoring solution provided by farmer in block list", ); continue; @@ -436,8 +516,8 @@ where solution.piece_offset, solution.history_size, max_pieces_in_sector, - self.chain_constants.recent_segments(), - self.chain_constants.recent_history_fraction(), + chain_constants.recent_segments(), + chain_constants.recent_history_fraction(), ) .segment_index(); let maybe_segment_commitment = self @@ -449,17 +529,16 @@ where Some(segment_commitment) => segment_commitment, None => { warn!( - target: "subspace", - "Segment commitment for segment index {} not found (slot {})", - segment_index, - slot, + %slot, + %segment_index, + "Segment commitment not found", ); continue; } }; let sector_expiration_check_segment_index = match solution .history_size - .sector_expiration_check(self.chain_constants.min_sector_lifetime()) + .sector_expiration_check(chain_constants.min_sector_lifetime()) { Some(sector_expiration_check) => sector_expiration_check.segment_index(), None => { @@ -479,9 +558,9 @@ where piece_check_params: Some(PieceCheckParams { max_pieces_in_sector, segment_commitment, - recent_segments: self.chain_constants.recent_segments(), - recent_history_fraction: self.chain_constants.recent_history_fraction(), - min_sector_lifetime: self.chain_constants.min_sector_lifetime(), + recent_segments: chain_constants.recent_segments(), + recent_history_fraction: chain_constants.recent_history_fraction(), + min_sector_lifetime: chain_constants.min_sector_lifetime(), current_history_size: history_size, sector_expiration_check_segment_commitment, }), @@ -495,7 +574,7 @@ where // block reward is claimed if solution_distance <= solution_range / 2 { if maybe_pre_digest.is_none() { - info!(target: "subspace", "🚜 Claimed block at slot {slot}"); + info!(%slot, "🚜 Claimed block at slot"); maybe_pre_digest.replace(PreDigest::V0 { slot, solution, @@ -506,15 +585,15 @@ where }); } else { info!( - target: "subspace", - "Skipping solution that has quality sufficient for block {slot} \ - because block pre-digest was already created", + %slot, + "Skipping solution that has quality sufficient for block because \ + block pre-digest was already created", ); } } else if !parent_header.number().is_zero() { // Not sending vote on top of genesis block since segment headers since piece // verification wouldn't be possible due to missing (for now) segment commitment - info!(target: "subspace", "🗳️ Claimed vote at slot {slot}"); + info!(%slot, "🗳️ Claimed vote at slot"); self.create_vote( parent_header, @@ -536,18 +615,24 @@ where .is_some() { debug!( - target: "subspace", - "Invalid solution received for slot {slot}: {error:?}", + %slot, + %error, + "Invalid solution received", ); } else { warn!( - target: "subspace", - "Invalid solution received for slot {slot}: {error:?}", + %slot, + %error, + "Invalid solution received", ); } } Err(error) => { - warn!(target: "subspace", "Invalid solution received for slot {slot}: {error:?}"); + warn!( + %slot, + %error, + "Invalid solution received", + ); } } } @@ -669,6 +754,47 @@ where AS: AuxStore + Send + Sync + 'static, BlockNumber: From<<::Header as Header>::Number>, { + /// Create new Subspace slot worker + pub fn new( + SubspaceSlotWorkerOptions { + client, + env, + block_import, + sync_oracle, + justification_sync_link, + force_authoring, + backoff_authoring_blocks, + subspace_link, + segment_headers_store, + block_proposal_slot_portion, + max_block_proposal_slot_portion, + telemetry, + offchain_tx_pool_factory, + pot_verifier, + }: SubspaceSlotWorkerOptions, + ) -> Self { + Self { + client: client.clone(), + block_import, + env, + sync_oracle, + justification_sync_link, + force_authoring, + backoff_authoring_blocks, + subspace_link, + reward_signing_context: schnorrkel::context::signing_context(REWARD_SIGNING_CONTEXT), + block_proposal_slot_portion, + max_block_proposal_slot_portion, + telemetry, + offchain_tx_pool_factory, + segment_headers_store, + pending_solutions: Default::default(), + pot_checkpoints: Default::default(), + pot_verifier, + _pos_table: PhantomData::, + } + } + async fn create_vote( &self, parent_header: &Block::Header, @@ -703,8 +829,9 @@ where Ok(signature) => signature, Err(error) => { error!( - target: "subspace", - "Failed to submit vote at slot {slot}: {error:?}", + %slot, + %error, + "Failed to submit vote", ); return; } @@ -714,8 +841,9 @@ where if let Err(error) = runtime_api.submit_vote_extrinsic(parent_hash, signed_vote) { error!( - target: "subspace", - "Failed to submit vote at slot {slot}: {error:?}", + %slot, + %error, + "Failed to submit vote", ); } } @@ -746,8 +874,8 @@ where .is_err() { warn!( - target: "subspace", - "Received invalid signature for reward hash {hash:?}" + %hash, + "Received invalid signature for reward" ); continue; } diff --git a/crates/sc-consensus-subspace/src/verifier.rs b/crates/sc-consensus-subspace/src/verifier.rs index 260ccbed27..1df226cdb3 100644 --- a/crates/sc-consensus-subspace/src/verifier.rs +++ b/crates/sc-consensus-subspace/src/verifier.rs @@ -1,8 +1,6 @@ //! Subspace block import implementation -use crate::Error; use futures::lock::Mutex; -use log::{debug, info, trace, warn}; use rand::prelude::*; use rayon::prelude::*; use sc_client_api::backend::AuxStore; @@ -38,6 +36,7 @@ use subspace_core_primitives::{BlockNumber, PublicKey, RewardSignature}; use subspace_proof_of_space::Table; use subspace_verification::{check_reward_signature, verify_solution, VerifySolutionParams}; use tokio::sync::Semaphore; +use tracing::{debug, info, trace, warn}; /// This corresponds to default value of `--max-runtime-instances` in Substrate const BLOCKS_LIST_CHECK_CONCURRENCY: usize = 8; @@ -102,6 +101,8 @@ where { /// Substrate client pub client: Arc, + /// Subspace chain constants + pub chain_constants: ChainConstants, /// Kzg instance pub kzg: Kzg, /// Chain selection rule @@ -153,11 +154,10 @@ where SelectChain: sp_consensus::SelectChain, { /// Create new instance - pub fn new( - options: SubspaceVerifierOptions, - ) -> sp_blockchain::Result { + pub fn new(options: SubspaceVerifierOptions) -> Self { let SubspaceVerifierOptions { client, + chain_constants, kzg, select_chain, telemetry, @@ -168,11 +168,7 @@ where pot_verifier, } = options; - let chain_constants = client - .runtime_api() - .chain_constants(client.info().best_hash)?; - - Ok(Self { + Self { client, kzg, select_chain, @@ -187,7 +183,7 @@ where block_list_verification_semaphore: Semaphore::new(BLOCKS_LIST_CHECK_CONCURRENCY), _pos_table: Default::default(), _block: Default::default(), - }) + } } /// Determine if full proof of time verification is needed for this block number @@ -390,7 +386,7 @@ where header: &Block::Header, author: &FarmerPublicKey, origin: &BlockOrigin, - ) -> Result<(), Error> { + ) -> Result<(), String> { // don't report any equivocations during initial sync // as they are most likely stale. if *origin == BlockOrigin::NetworkInitialSync { @@ -404,7 +400,7 @@ where // check if authorship of this header is an equivocation and return a proof if so. let equivocation_proof = match check_equivocation(&*self.client, slot_now, slot, header, author) - .map_err(Error::Client)? + .map_err(|error| error.to_string())? { Some(proof) => proof, None => return Ok(()), @@ -425,7 +421,7 @@ where .best_chain() .await .map(|h| h.hash()) - .map_err(|e| Error::Client(e.into()))?; + .map_err(|error| error.to_string())?; // submit equivocation report at best block. let mut runtime_api = self.client.runtime_api(); @@ -436,14 +432,11 @@ where ); runtime_api .submit_report_equivocation_extrinsic(best_hash, equivocation_proof) - .map_err(Error::RuntimeApi)?; + .map_err(|error| error.to_string())?; - info!(target: "subspace", "Submitted equivocation report for author {:?}", author); + info!(%author, "Submitted equivocation report for author"); } else { - info!( - target: "subspace", - "Not submitting equivocation report because node is not authoring blocks" - ); + info!("Not submitting equivocation report because node is not authoring blocks"); } Ok(()) @@ -470,25 +463,26 @@ where mut block: BlockImportParams, ) -> Result, String> { trace!( - target: "subspace", - "Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}", - block.origin, - block.header, - block.justifications, - block.body, + origin = ?block.origin, + header = ?block.header, + justifications = ?block.justifications, + body = ?block.body, + "Verifying", ); let hash = block.header.hash(); - debug!(target: "subspace", "We have {:?} logs in this header", block.header.digest().logs().len()); + debug!( + "We have {:?} logs in this header", + block.header.digest().logs().len() + ); let subspace_digest_items = extract_subspace_digest_items::< Block::Header, FarmerPublicKey, FarmerPublicKey, FarmerSignature, - >(&block.header) - .map_err(Error::::from)?; + >(&block.header)?; // Check if farmer's plot is burned, ignore runtime API errors since this check will happen // during block import anyway @@ -509,19 +503,18 @@ where .unwrap_or_default() { warn!( - target: "subspace", - "Verifying block with solution provided by farmer in block list: {}", - subspace_digest_items.pre_digest.solution().public_key + public_key = %subspace_digest_items.pre_digest.solution().public_key, + "Verifying block with solution provided by farmer in block list" ); - return Err(Error::::FarmerInBlockList( + return Err(format!( + "Farmer {} is in block list", subspace_digest_items .pre_digest .solution() .public_key .clone(), - ) - .into()); + )); } } @@ -549,7 +542,7 @@ where &block.justifications, ) .await - .map_err(Error::::from)?; + .map_err(|error| error.to_string())?; let CheckedHeader { pre_header, @@ -575,7 +568,7 @@ where // the header is valid but let's check if there was something else already proposed at the // same slot by the given author. if there was, we will report the equivocation to the // runtime. - if let Err(err) = self + if let Err(error) = self .check_and_report_equivocation( slot_now, slot, @@ -586,13 +579,12 @@ where .await { warn!( - target: "subspace", - "Error checking/reporting Subspace equivocation: {}", - err + %error, + "Error checking/reporting Subspace equivocation" ); } - trace!(target: "subspace", "Checked {:?}; importing.", pre_header); + trace!(?pre_header, "Checked header; importing"); telemetry!( self.telemetry; CONSENSUS_TRACE; diff --git a/crates/sp-consensus-subspace/src/inherents.rs b/crates/sp-consensus-subspace/src/inherents.rs index 19d4f4c8e6..6e6af6d467 100644 --- a/crates/sp-consensus-subspace/src/inherents.rs +++ b/crates/sp-consensus-subspace/src/inherents.rs @@ -51,7 +51,8 @@ impl IsFatalError for InherentError { #[derive(Debug, Encode, Decode)] pub struct InherentType { /// Slot at which block was created. - pub slot: Slot, + // TODO: Remove slot when breaking protocol and probably change the whole data structure to an enum + slot: Slot, /// Segment headers expected to be included in the block. pub segment_headers: Vec, } @@ -83,44 +84,23 @@ pub struct InherentDataProvider { #[cfg(feature = "std")] impl InherentDataProvider { - /// Create new inherent data provider from the given `data`. - pub fn new(slot: Slot, segment_headers: Vec) -> Self { + /// Create new inherent data provider from the given `segment_headers`. + pub fn new(segment_headers: Vec) -> Self { Self { data: InherentType { - slot, + // TODO: Remove slot when breaking protocol + slot: Default::default(), segment_headers, }, } } - /// Creates the inherent data provider by calculating the slot from the given - /// `timestamp` and `duration`. - pub fn from_timestamp_and_slot_duration( - timestamp: sp_timestamp::Timestamp, - slot_duration: sp_consensus_slots::SlotDuration, - segment_headers: Vec, - ) -> Self { - let slot = Slot::from_timestamp(timestamp, slot_duration); - - Self::new(slot, segment_headers) - } - /// Returns the `data` of this inherent data provider. pub fn data(&self) -> &InherentType { &self.data } } -#[cfg(feature = "std")] -impl sp_std::ops::Deref for InherentDataProvider { - type Target = Slot; - - #[inline] - fn deref(&self) -> &Self::Target { - &self.data.slot - } -} - #[cfg(feature = "std")] #[async_trait::async_trait] impl sp_inherents::InherentDataProvider for InherentDataProvider { diff --git a/crates/sp-consensus-subspace/src/lib.rs b/crates/sp-consensus-subspace/src/lib.rs index 7a5d1a3bad..d933a3ad3d 100644 --- a/crates/sp-consensus-subspace/src/lib.rs +++ b/crates/sp-consensus-subspace/src/lib.rs @@ -403,9 +403,9 @@ impl Default for SolutionRanges { } } -// TODO: Likely add more stuff here +// TODO: Remove V0 when we break the protocol /// Subspace blockchain constants. -#[derive(Debug, Encode, Decode, MaxEncodedLen, PartialEq, Eq, Clone, Copy, TypeInfo)] +#[derive(Debug, Encode, Decode, PartialEq, Eq, Clone, Copy, TypeInfo)] pub enum ChainConstants { /// V0 of the chain constants. #[codec(index = 0)] @@ -425,65 +425,125 @@ pub enum ChainConstants { /// Minimum lifetime of a plotted sector, measured in archived segment. min_sector_lifetime: HistorySize, }, + /// V0 of the chain constants. + #[codec(index = 1)] + V1 { + /// Depth `K` after which a block enters the recorded history. + confirmation_depth_k: BlockNumber, + /// Number of slots between slot arrival and when corresponding block can be produced. + block_authoring_delay: Slot, + /// Era duration in blocks. + era_duration: BlockNumber, + /// Slot probability. + slot_probability: (u64, u64), + /// The slot duration in milliseconds. + slot_duration: SlotDuration, + /// Number of latest archived segments that are considered "recent history". + recent_segments: HistorySize, + /// Fraction of pieces from the "recent history" (`recent_segments`) in each sector. + recent_history_fraction: (HistorySize, HistorySize), + /// Minimum lifetime of a plotted sector, measured in archived segment. + min_sector_lifetime: HistorySize, + }, } impl ChainConstants { /// Depth `K` after which a block enters the recorded history. pub fn confirmation_depth_k(&self) -> BlockNumber { - let Self::V0 { - confirmation_depth_k, - .. - } = self; - *confirmation_depth_k + match self { + Self::V0 { + confirmation_depth_k, + .. + } + | Self::V1 { + confirmation_depth_k, + .. + } => *confirmation_depth_k, + } } /// Era duration in blocks. pub fn era_duration(&self) -> BlockNumber { - let Self::V0 { era_duration, .. } = self; - *era_duration + match self { + Self::V0 { era_duration, .. } | Self::V1 { era_duration, .. } => *era_duration, + } } /// Number of slots between slot arrival and when corresponding block can be produced. pub fn block_authoring_delay(&self) -> Slot { - let Self::V0 { - block_authoring_delay, - .. - } = self; - *block_authoring_delay + match self { + Self::V0 { + block_authoring_delay, + .. + } + | Self::V1 { + block_authoring_delay, + .. + } => *block_authoring_delay, + } } /// Slot probability. pub fn slot_probability(&self) -> (u64, u64) { - let Self::V0 { - slot_probability, .. - } = self; - *slot_probability + match self { + Self::V0 { + slot_probability, .. + } + | Self::V1 { + slot_probability, .. + } => *slot_probability, + } + } + + /// The slot duration in milliseconds. + pub fn slot_duration(&self) -> SlotDuration { + match self { + Self::V0 { .. } => { + // 1000ms is used on most networks, so it is a safe default + SlotDuration::from_millis(1000) + } + Self::V1 { slot_duration, .. } => *slot_duration, + } } /// Number of latest archived segments that are considered "recent history". pub fn recent_segments(&self) -> HistorySize { - let Self::V0 { - recent_segments, .. - } = self; - *recent_segments + match self { + Self::V0 { + recent_segments, .. + } + | Self::V1 { + recent_segments, .. + } => *recent_segments, + } } /// Fraction of pieces from the "recent history" (`recent_segments`) in each sector. pub fn recent_history_fraction(&self) -> (HistorySize, HistorySize) { - let Self::V0 { - recent_history_fraction, - .. - } = self; - *recent_history_fraction + match self { + Self::V0 { + recent_history_fraction, + .. + } + | Self::V1 { + recent_history_fraction, + .. + } => *recent_history_fraction, + } } /// Minimum lifetime of a plotted sector, measured in archived segment. pub fn min_sector_lifetime(&self) -> HistorySize { - let Self::V0 { - min_sector_lifetime, - .. - } = self; - *min_sector_lifetime + match self { + Self::V0 { + min_sector_lifetime, + .. + } + | Self::V1 { + min_sector_lifetime, + .. + } => *min_sector_lifetime, + } } } @@ -688,6 +748,7 @@ sp_api::decl_runtime_apis! { /// API necessary for block authorship with Subspace. pub trait SubspaceApi { /// The slot duration in milliseconds for Subspace. + #[deprecated(note = "Use chain constants instead")] fn slot_duration() -> SlotDuration; /// Proof of time parameters diff --git a/crates/subspace-node/src/domain/domain_instance_starter.rs b/crates/subspace-node/src/domain/domain_instance_starter.rs index 4307e237ba..2d7e91a826 100644 --- a/crates/subspace-node/src/domain/domain_instance_starter.rs +++ b/crates/subspace-node/src/domain/domain_instance_starter.rs @@ -10,8 +10,9 @@ use domain_service::{FullBackend, FullClient}; use futures::StreamExt; use sc_chain_spec::ChainSpec; use sc_cli::{CliConfiguration, Database, DefaultConfigurationValues, SubstrateCli}; +use sc_consensus_subspace::block_import::BlockImportingNotification; use sc_consensus_subspace::notification::SubspaceNotificationStream; -use sc_consensus_subspace::{BlockImportingNotification, NewSlotNotification}; +use sc_consensus_subspace::slot_worker::NewSlotNotification; use sc_service::{BasePath, Configuration}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}; diff --git a/crates/subspace-runtime/src/lib.rs b/crates/subspace-runtime/src/lib.rs index ca3b93d85b..d0abd3302a 100644 --- a/crates/subspace-runtime/src/lib.rs +++ b/crates/subspace-runtime/src/lib.rs @@ -49,7 +49,7 @@ pub use pallet_subspace::AllowAuthoringBy; use pallet_transporter::EndpointHandler; use scale_info::TypeInfo; use sp_api::{impl_runtime_apis, BlockT}; -use sp_consensus_slots::SlotDuration; +use sp_consensus_slots::{Slot, SlotDuration}; use sp_consensus_subspace::{ ChainConstants, EquivocationProof, FarmerPublicKey, PotParameters, SignedVote, SolutionRanges, Vote, @@ -313,6 +313,7 @@ parameter_types! { pub const PotEntropyInjectionInterval: BlockNumber = POT_ENTROPY_INJECTION_INTERVAL; pub const PotEntropyInjectionLookbackDepth: u8 = POT_ENTROPY_INJECTION_LOOKBACK_DEPTH; pub const PotEntropyInjectionDelay: SlotNumber = POT_ENTROPY_INJECTION_DELAY; + pub const EraDuration: u32 = ERA_DURATION_IN_BLOCKS; pub const SlotProbability: (u64, u64) = SLOT_PROBABILITY; pub const ExpectedVotesPerBlock: u32 = EXPECTED_VOTES_PER_BLOCK; pub const RecentSegments: HistorySize = RECENT_SEGMENTS; @@ -337,7 +338,7 @@ impl pallet_subspace::Config for Runtime { type PotEntropyInjectionInterval = PotEntropyInjectionInterval; type PotEntropyInjectionLookbackDepth = PotEntropyInjectionLookbackDepth; type PotEntropyInjectionDelay = PotEntropyInjectionDelay; - type EraDuration = ConstU32; + type EraDuration = EraDuration; type InitialSolutionRange = ConstU64; type SlotProbability = SlotProbability; type ConfirmationDepthK = ConfirmationDepthK; @@ -982,7 +983,16 @@ impl_runtime_apis! { } fn chain_constants() -> ChainConstants { - Subspace::chain_constants() + ChainConstants::V1 { + confirmation_depth_k: ConfirmationDepthK::get(), + block_authoring_delay: Slot::from(BlockAuthoringDelay::get()), + era_duration: EraDuration::get(), + slot_probability: SlotProbability::get(), + slot_duration: SlotDuration::from_millis(SLOT_DURATION), + recent_segments: RecentSegments::get(), + recent_history_fraction: RecentHistoryFraction::get(), + min_sector_lifetime: MinSectorLifetime::get(), + } } } diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 147a7e1041..4805ed44e1 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -49,15 +49,22 @@ use sc_client_api::execution_extensions::ExtensionsFactory; use sc_client_api::{ AuxStore, Backend, BlockBackend, BlockchainEvents, ExecutorProvider, HeaderBackend, }; -use sc_consensus::{BasicQueue, DefaultImportQueue, ImportQueue, SharedBlockImport}; +use sc_consensus::{ + BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, DefaultImportQueue, ImportQueue, + ImportResult, SharedBlockImport, +}; use sc_consensus_slots::SlotProportion; -use sc_consensus_subspace::archiver::{create_subspace_archiver, SegmentHeadersStore}; +use sc_consensus_subspace::archiver::{ + create_subspace_archiver, ArchivedSegmentNotification, SegmentHeadersStore, +}; +use sc_consensus_subspace::block_import::{BlockImportingNotification, SubspaceBlockImport}; use sc_consensus_subspace::notification::SubspaceNotificationStream; -use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions}; -use sc_consensus_subspace::{ - ArchivedSegmentNotification, BlockImportingNotification, NewSlotNotification, - RewardSigningNotification, SubspaceLink, SubspaceParams, SubspaceSyncOracle, +use sc_consensus_subspace::slot_worker::{ + NewSlotNotification, RewardSigningNotification, SubspaceSlotWorker, SubspaceSlotWorkerOptions, + SubspaceSyncOracle, }; +use sc_consensus_subspace::verifier::{SubspaceVerifier, SubspaceVerifierOptions}; +use sc_consensus_subspace::SubspaceLink; use sc_executor::{NativeElseWasmExecutor, NativeExecutionDispatch}; use sc_network::NetworkService; use sc_proof_of_time::source::gossip::pot_gossip_peers_set_config; @@ -154,6 +161,40 @@ pub enum Error { Other(Box), } +// Simple wrapper whose ony purpose is to convert error type +struct BlockImportWrapper(BI); + +#[async_trait::async_trait] +impl BlockImport for BlockImportWrapper +where + Block: BlockT, + BI: BlockImport> + + Send + + Sync, +{ + type Error = sp_consensus::Error; + + async fn check_block( + &self, + block: BlockCheckParams, + ) -> Result { + self.0 + .check_block(block) + .await + .map_err(|error| sp_consensus::Error::Other(error.into())) + } + + async fn import_block( + &mut self, + block: BlockImportParams, + ) -> Result { + self.0 + .import_block(block) + .await + .map_err(|error| sp_consensus::Error::Other(error.into())) + } +} + /// Subspace-like full client. pub type FullClient = sc_service::TFullClient>; @@ -434,9 +475,10 @@ where let kzg = tokio::task::block_in_place(|| Kzg::new(embedded_kzg_settings())); let client = Arc::new(client); + let client_info = client.info(); let pot_verifier = PotVerifier::new( - PotSeed::from_genesis(client.info().genesis_hash.as_ref(), pot_external_entropy), + PotSeed::from_genesis(client_info.genesis_hash.as_ref(), pot_external_entropy), POT_VERIFIER_CACHE_SIZE, ); @@ -465,18 +507,16 @@ where tokio::task::block_in_place(|| SegmentHeadersStore::new(client.clone())) .map_err(|error| ServiceError::Application(error.into()))?; - let (block_import, subspace_link) = sc_consensus_subspace::block_import::< - PosTable, - _, - _, - _, - _, - _, - >( - sc_consensus_subspace::slot_duration(&*client)?, + let chain_constants = client + .runtime_api() + .chain_constants(client_info.best_hash) + .map_err(|error| ServiceError::Application(error.into()))?; + + let subspace_link = SubspaceLink::new(chain_constants, kzg.clone()); + let block_import = SubspaceBlockImport::::new( client.clone(), client.clone(), - kzg.clone(), + subspace_link.clone(), { let client = client.clone(); @@ -486,7 +526,6 @@ where async move { let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - // TODO: Would be nice if the whole header was passed in here let parent_header = client .header(parent_hash)? .expect("Parent header must always exist when block is created; qed"); @@ -494,9 +533,7 @@ where let parent_block_number = parent_header.number; let subspace_inherents = - sp_consensus_subspace::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - subspace_link.slot_duration(), + sp_consensus_subspace::inherents::InherentDataProvider::new( subspace_link.segment_headers_for_block(parent_block_number + 1), ); @@ -506,10 +543,10 @@ where }, segment_headers_store.clone(), pot_verifier.clone(), - )?; + ); let sync_target_block_number = Arc::new(AtomicU32::new(0)); - let transaction_pool = crate::transaction_pool::new_full( + let transaction_pool = transaction_pool::new_full( config, &task_manager, client.clone(), @@ -518,6 +555,7 @@ where let verifier = SubspaceVerifier::::new(SubspaceVerifierOptions { client: client.clone(), + chain_constants, kzg, select_chain: select_chain.clone(), telemetry: telemetry.as_ref().map(|x| x.handle()), @@ -526,9 +564,9 @@ where sync_target_block_number: Arc::clone(&sync_target_block_number), is_authoring_blocks: config.role.is_authority(), pot_verifier: pot_verifier.clone(), - })?; + }); - let block_import = SharedBlockImport::new(block_import); + let block_import = SharedBlockImport::new(BlockImportWrapper(block_import)); let import_queue = BasicQueue::new( verifier, block_import.clone(), @@ -932,65 +970,69 @@ where telemetry.as_ref().map(|x| x.handle()), ); - let subspace_config = SubspaceParams { - client: client.clone(), - select_chain: select_chain.clone(), - env: proposer_factory, - block_import, - sync_oracle: sync_oracle.clone(), - justification_sync_link: sync_service.clone(), - create_inherent_data_providers: { + let subspace_slot_worker = + SubspaceSlotWorker::::new(SubspaceSlotWorkerOptions { + client: client.clone(), + env: proposer_factory, + block_import, + sync_oracle: sync_oracle.clone(), + justification_sync_link: sync_service.clone(), + force_authoring: config.base.force_authoring, + backoff_authoring_blocks, + subspace_link: subspace_link.clone(), + segment_headers_store: segment_headers_store.clone(), + block_proposal_slot_portion, + max_block_proposal_slot_portion: None, + telemetry: telemetry.as_ref().map(|x| x.handle()), + offchain_tx_pool_factory, + pot_verifier, + }); + + let create_inherent_data_providers = { + let client = client.clone(); + let subspace_link = subspace_link.clone(); + + move |parent_hash, ()| { let client = client.clone(); let subspace_link = subspace_link.clone(); - move |parent_hash, ()| { - let client = client.clone(); - let subspace_link = subspace_link.clone(); - - async move { - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + async move { + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - // TODO: Would be nice if the whole header was passed in here - let parent_header = client - .header(parent_hash)? - .expect("Parent header must always exist when block is created; qed"); + // TODO: Would be nice if the whole header was passed in here + let parent_header = client + .header(parent_hash)? + .expect("Parent header must always exist when block is created; qed"); - let parent_block_number = parent_header.number; + let parent_block_number = parent_header.number; - let subspace_inherents = - sp_consensus_subspace::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - subspace_link.slot_duration(), - subspace_link.segment_headers_for_block(parent_block_number + 1), - ); + let subspace_inherents = + sp_consensus_subspace::inherents::InherentDataProvider::new( + subspace_link.segment_headers_for_block(parent_block_number + 1), + ); - Ok((subspace_inherents, timestamp)) - } + Ok((timestamp, subspace_inherents)) } - }, - force_authoring: config.base.force_authoring, - backoff_authoring_blocks, - subspace_link: subspace_link.clone(), - segment_headers_store: segment_headers_store.clone(), - block_proposal_slot_portion, - max_block_proposal_slot_portion: None, - telemetry: telemetry.as_ref().map(|x| x.handle()), - offchain_tx_pool_factory, - pot_verifier, - pot_slot_info_stream, + } }; - let subspace = - sc_consensus_subspace::start_subspace::( - subspace_config, - )?; + info!(target: "subspace", "🧑‍🌾 Starting Subspace Authorship worker"); + let slot_worker_task = sc_proof_of_time::start_slot_worker( + subspace_link.chain_constants().slot_duration(), + client.clone(), + select_chain.clone(), + subspace_slot_worker, + sync_oracle.clone(), + create_inherent_data_providers, + pot_slot_info_stream, + ); // Subspace authoring task is considered essential, i.e. if it fails we take down the // service with it. task_manager.spawn_essential_handle().spawn_blocking( "subspace-proposer", Some("block-authoring"), - subspace, + slot_worker_task, ); } diff --git a/crates/subspace-service/src/rpc.rs b/crates/subspace-service/src/rpc.rs index 1de37261ef..6b84a986c1 100644 --- a/crates/subspace-service/src/rpc.rs +++ b/crates/subspace-service/src/rpc.rs @@ -24,10 +24,10 @@ use jsonrpsee::RpcModule; use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer}; use sc_client_api::{AuxStore, BlockBackend}; -use sc_consensus_subspace::archiver::SegmentHeadersStore; +use sc_consensus_subspace::archiver::{ArchivedSegmentNotification, SegmentHeadersStore}; use sc_consensus_subspace::notification::SubspaceNotificationStream; -use sc_consensus_subspace::{ - ArchivedSegmentNotification, NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle, +use sc_consensus_subspace::slot_worker::{ + NewSlotNotification, RewardSigningNotification, SubspaceSyncOracle, }; use sc_consensus_subspace_rpc::{SubspaceRpc, SubspaceRpcApiServer, SubspaceRpcConfig}; use sc_rpc::SubscriptionTaskExecutor; diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 1dae44008a..071f62fdb7 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -26,7 +26,7 @@ use futures::StreamExt; use sc_client_api::{BlockBackend, HeaderBackend}; use sc_consensus_subspace::archiver::encode_block; use sc_consensus_subspace::notification::SubspaceNotificationStream; -use sc_consensus_subspace::{NewSlotNotification, RewardSigningNotification}; +use sc_consensus_subspace::slot_worker::{NewSlotNotification, RewardSigningNotification}; use sp_api::ProvideRuntimeApi; use sp_consensus_subspace::{FarmerPublicKey, FarmerSignature, SubspaceApi}; use sp_core::{Decode, Encode}; diff --git a/test/subspace-test-runtime/src/lib.rs b/test/subspace-test-runtime/src/lib.rs index 4068aa673d..a4b0dcbddc 100644 --- a/test/subspace-test-runtime/src/lib.rs +++ b/test/subspace-test-runtime/src/lib.rs @@ -44,7 +44,7 @@ pub use pallet_subspace::AllowAuthoringBy; use pallet_transporter::EndpointHandler; use scale_info::TypeInfo; use sp_api::{impl_runtime_apis, BlockT}; -use sp_consensus_slots::SlotDuration; +use sp_consensus_slots::{Slot, SlotDuration}; use sp_consensus_subspace::{ ChainConstants, EquivocationProof, FarmerPublicKey, PotParameters, SignedVote, SolutionRanges, Vote, @@ -266,6 +266,7 @@ parameter_types! { pub const PotEntropyInjectionInterval: BlockNumber = POT_ENTROPY_INJECTION_INTERVAL; pub const PotEntropyInjectionLookbackDepth: u8 = POT_ENTROPY_INJECTION_LOOKBACK_DEPTH; pub const PotEntropyInjectionDelay: SlotNumber = POT_ENTROPY_INJECTION_DELAY; + pub const EraDuration: BlockNumber = ERA_DURATION_IN_BLOCKS; pub const SlotProbability: (u64, u64) = SLOT_PROBABILITY; pub const ShouldAdjustSolutionRange: bool = false; pub const ExpectedVotesPerBlock: u32 = 9; @@ -284,7 +285,7 @@ impl pallet_subspace::Config for Runtime { type PotEntropyInjectionInterval = PotEntropyInjectionInterval; type PotEntropyInjectionLookbackDepth = PotEntropyInjectionLookbackDepth; type PotEntropyInjectionDelay = PotEntropyInjectionDelay; - type EraDuration = ConstU32; + type EraDuration = EraDuration; type InitialSolutionRange = ConstU64; type SlotProbability = SlotProbability; type ConfirmationDepthK = ConfirmationDepthK; @@ -1150,7 +1151,16 @@ impl_runtime_apis! { } fn chain_constants() -> ChainConstants { - Subspace::chain_constants() + ChainConstants::V1 { + confirmation_depth_k: ConfirmationDepthK::get(), + block_authoring_delay: Slot::from(BlockAuthoringDelay::get()), + era_duration: EraDuration::get(), + slot_probability: SlotProbability::get(), + slot_duration: SlotDuration::from_millis(SLOT_DURATION), + recent_segments: RecentSegments::get(), + recent_history_fraction: RecentHistoryFraction::get(), + min_sector_lifetime: MinSectorLifetime::get(), + } } } diff --git a/test/subspace-test-service/src/lib.rs b/test/subspace-test-service/src/lib.rs index aaa5faa8cd..c89190e81f 100644 --- a/test/subspace-test-service/src/lib.rs +++ b/test/subspace-test-service/src/lib.rs @@ -605,7 +605,7 @@ impl MockConsensusNode { >::into(slot) * SLOT_DURATION, )); let subspace_inherents = - sp_consensus_subspace::inherents::InherentDataProvider::new(slot, vec![]); + sp_consensus_subspace::inherents::InherentDataProvider::new(vec![]); let inherent_data = (subspace_inherents, timestamp) .create_inherent_data()