diff --git a/crates/pallet-domains/src/lib.rs b/crates/pallet-domains/src/lib.rs index b1761eecd2..fdb09f6088 100644 --- a/crates/pallet-domains/src/lib.rs +++ b/crates/pallet-domains/src/lib.rs @@ -672,7 +672,8 @@ mod pallet { /// Storage to hold all the domain's latest confirmed block. #[pallet::storage] - pub(super) type LatestConfirmedDomainExecutionReceipt = + #[pallet::getter(fn latest_confirmed_domain_execution_receipt)] + pub type LatestConfirmedDomainExecutionReceipt = StorageMap<_, Identity, DomainId, ExecutionReceiptOf, OptionQuery>; /// The latest ER submitted by the operator for a given domain. It is used to determine if the operator diff --git a/crates/sp-domains/src/lib.rs b/crates/sp-domains/src/lib.rs index 1b34ba7c68..3495acf64c 100644 --- a/crates/sp-domains/src/lib.rs +++ b/crates/sp-domains/src/lib.rs @@ -1506,7 +1506,7 @@ impl OnChainRewards for () { sp_api::decl_runtime_apis! { /// API necessary for domains pallet. - #[api_version(5)] + #[api_version(6)] pub trait DomainsApi { /// Submits the transaction bundle via an unsigned extrinsic. fn submit_bundle_unsigned(opaque_bundle: OpaqueBundle, Block::Hash, DomainHeader, Balance>); @@ -1600,7 +1600,10 @@ sp_api::decl_runtime_apis! { /// Return domain sudo call. fn domain_sudo_call(domain_id: DomainId) -> Option>; - } + + /// Return last confirmed domain block execution receipt. + fn last_confirmed_domain_block_receipt(domain_id: DomainId) ->Option>; +} pub trait BundleProducerElectionApi { fn bundle_producer_election_params(domain_id: DomainId) -> Option>; diff --git a/crates/subspace-fake-runtime-api/src/lib.rs b/crates/subspace-fake-runtime-api/src/lib.rs index 74028f9344..e8beb05e52 100644 --- a/crates/subspace-fake-runtime-api/src/lib.rs +++ b/crates/subspace-fake-runtime-api/src/lib.rs @@ -316,6 +316,10 @@ sp_api::impl_runtime_apis! { fn domain_sudo_call(_domain_id: DomainId) -> Option> { unreachable!() } + + fn last_confirmed_domain_block_receipt(_domain_id: DomainId) -> Option> { + unreachable!() + } } impl sp_domains::BundleProducerElectionApi for Runtime { diff --git a/crates/subspace-runtime/src/lib.rs b/crates/subspace-runtime/src/lib.rs index 4453eb6250..3a9c194ba7 100644 --- a/crates/subspace-runtime/src/lib.rs +++ b/crates/subspace-runtime/src/lib.rs @@ -1424,6 +1424,10 @@ impl_runtime_apis! { fn domain_sudo_call(domain_id: DomainId) -> Option> { Domains::domain_sudo_call(domain_id) } + + fn last_confirmed_domain_block_receipt(domain_id: DomainId) -> Option>{ + Domains::latest_confirmed_domain_execution_receipt(domain_id) + } } impl sp_domains::BundleProducerElectionApi for Runtime { diff --git a/crates/subspace-service/src/domains.rs b/crates/subspace-service/src/domains.rs new file mode 100644 index 0000000000..a68e3fcd3f --- /dev/null +++ b/crates/subspace-service/src/domains.rs @@ -0,0 +1,229 @@ +// Remove after adding domain snap-sync +#![allow(dead_code)] + +use crate::domains::request_handler::{ + generate_protocol_name, LastConfirmedBlockRequest, LastConfirmedBlockResponse, +}; +use async_trait::async_trait; +use domain_runtime_primitives::Balance; +use futures::channel::oneshot; +use parity_scale_codec::{Decode, Encode}; +use sc_network::{IfDisconnected, NetworkRequest, PeerId, RequestFailure}; +use sc_network_sync::SyncingService; +use sp_blockchain::HeaderBackend; +use sp_domains::{DomainId, ExecutionReceiptFor}; +use sp_runtime::traits::{Block as BlockT, Header}; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{debug, error, trace}; + +pub(crate) mod request_handler; + +const REQUEST_PAUSE: Duration = Duration::from_secs(5); + +/// Last confirmed domain block info error +#[derive(Debug, thiserror::Error)] +pub enum LastConfirmedDomainBlockResponseError { + #[error("Last confirmed domain block info request failed: {0}")] + RequestFailed(#[from] RequestFailure), + + #[error("Last confirmed domain block info request canceled")] + RequestCanceled, + + #[error("Last confirmed domain block info request failed: invalid protocol")] + InvalidProtocol, + + #[error("Failed to decode response: {0}")] + DecodeFailed(String), +} + +#[async_trait] +pub trait LastDomainBlockReceiptProvider: Send { + async fn get_execution_receipt( + &self, + block_hash: Option, + ) -> Option>; +} + +#[async_trait] +impl LastDomainBlockReceiptProvider for () { + async fn get_execution_receipt( + &self, + _: Option, + ) -> Option> { + None + } +} + +#[async_trait] +impl LastDomainBlockReceiptProvider + for LastDomainBlockInfoReceiver +where + Block: BlockT, + CBlock: BlockT, + NR: NetworkRequest + Sync + Send, + Client: HeaderBackend, +{ + async fn get_execution_receipt( + &self, + block_hash: Option, + ) -> Option> { + self.get_last_confirmed_domain_block_receipt::(block_hash) + .await + } +} + +pub struct LastDomainBlockInfoReceiver +where + Block: BlockT, + NR: NetworkRequest, + Client: HeaderBackend, +{ + domain_id: DomainId, + fork_id: Option, + client: Arc, + network_service: NR, + sync_service: Arc>, +} + +impl LastDomainBlockInfoReceiver +where + Block: BlockT, + NR: NetworkRequest, + Client: HeaderBackend, +{ + pub fn new( + domain_id: DomainId, + fork_id: Option, + client: Arc, + network_service: NR, + sync_service: Arc>, + ) -> Self { + Self { + domain_id, + fork_id, + client, + network_service, + sync_service, + } + } + pub async fn get_last_confirmed_domain_block_receipt( + &self, + block_hash: Option, + ) -> Option> { + let info = self.client.info(); + let protocol_name = generate_protocol_name(info.genesis_hash, self.fork_id.as_deref()); + + debug!(domain_id=%self.domain_id, %protocol_name, "Started obtaining domain info..."); + + loop { + let peers_info = match self.sync_service.peers_info().await { + Ok(peers_info) => peers_info, + Err(error) => { + error!("Peers info request returned an error: {error}",); + sleep(REQUEST_PAUSE).await; + + continue; + } + }; + + // Enumerate peers until we find a suitable source for domain info + 'peers: for (peer_id, peer_info) in peers_info.iter() { + debug!( + "Domain data request. peer = {peer_id}, info = {:?}", + peer_info + ); + + if !peer_info.is_synced { + trace!("Domain data request skipped (not synced). peer = {peer_id}"); + + continue 'peers; + } + + let request = LastConfirmedBlockRequest:: { + domain_id: self.domain_id, + block_hash, + }; + + let response = send_request::( + protocol_name.clone(), + *peer_id, + request, + &self.network_service, + ) + .await; + + match response { + Ok(response) => { + trace!("Response from a peer {peer_id},",); + + return Some(response.last_confirmed_block_receipt); + } + Err(error) => { + debug!("Domain info request failed. peer = {peer_id}: {error}"); + + continue 'peers; + } + } + } + debug!( + domain_id=%self.domain_id, + "No synced peers to handle the domain confirmed block infor request. Pausing..." + ); + + sleep(REQUEST_PAUSE).await; + } + } +} + +async fn send_request( + protocol_name: String, + peer_id: PeerId, + request: LastConfirmedBlockRequest, + network_service: &NR, +) -> Result, LastConfirmedDomainBlockResponseError> +{ + let (tx, rx) = oneshot::channel(); + + debug!("Sending request: {request:?} (peer={peer_id})"); + + let encoded_request = request.encode(); + + network_service.start_request( + peer_id, + protocol_name.clone().into(), + encoded_request, + None, + tx, + IfDisconnected::ImmediateError, + ); + + let result = rx + .await + .map_err(|_| LastConfirmedDomainBlockResponseError::RequestCanceled)?; + + match result { + Ok((data, response_protocol_name)) => { + if response_protocol_name != protocol_name.into() { + return Err(LastConfirmedDomainBlockResponseError::InvalidProtocol); + } + + let response = decode_response(&data) + .map_err(LastConfirmedDomainBlockResponseError::DecodeFailed)?; + + Ok(response) + } + Err(error) => Err(error.into()), + } +} + +fn decode_response( + mut response: &[u8], +) -> Result, String> { + let response = LastConfirmedBlockResponse::decode(&mut response).map_err(|error| { + format!("Failed to decode last confirmed domain block info response: {error}") + })?; + + Ok(response) +} diff --git a/crates/subspace-service/src/domains/request_handler.rs b/crates/subspace-service/src/domains/request_handler.rs new file mode 100644 index 0000000000..59252b357d --- /dev/null +++ b/crates/subspace-service/src/domains/request_handler.rs @@ -0,0 +1,263 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use domain_runtime_primitives::Balance; +use futures::channel::oneshot; +use futures::stream::StreamExt; +use parity_scale_codec::{Decode, Encode}; +use sc_client_api::{BlockBackend, ProofProvider}; +use sc_network::request_responses::{IncomingRequest, OutgoingResponse}; +use sc_network::{NetworkBackend, PeerId}; +use sp_api::{ApiExt, ProvideRuntimeApi}; +use sp_blockchain::HeaderBackend; +use sp_domains::{DomainId, DomainsApi, ExecutionReceiptFor}; +use sp_domains_fraud_proof::FraudProofApi; +use sp_runtime::codec; +use sp_runtime::traits::{Block as BlockT, Header}; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; +use tracing::{debug, error, trace}; + +/// Generates a `RequestResponseProtocolConfig` for the state request protocol, refusing incoming +/// requests. +pub fn generate_protocol_config< + Hash: AsRef<[u8]>, + B: BlockT, + N: NetworkBackend::Hash>, +>( + genesis_hash: Hash, + fork_id: Option<&str>, + inbound_queue: async_channel::Sender, +) -> N::RequestResponseProtocolConfig { + N::request_response_config( + generate_protocol_name(genesis_hash, fork_id).into(), + Vec::new(), + 1024 * 1024, + 16 * 1024 * 1024, + Duration::from_secs(40), + Some(inbound_queue), + ) +} + +/// Generate the state protocol name from the genesis hash and fork id. +pub fn generate_protocol_name>( + genesis_hash: Hash, + fork_id: Option<&str>, +) -> String { + let genesis_hash = genesis_hash.as_ref(); + if let Some(fork_id) = fork_id { + format!( + "/{}/{}/last-confirmed-block/1", + array_bytes::bytes2hex("", genesis_hash), + fork_id + ) + } else { + format!( + "/{}/last-confirmed-block/1", + array_bytes::bytes2hex("", genesis_hash) + ) + } +} + +/// Request last confirmed domain block data from a peer. +#[derive(Clone, PartialEq, Encode, Decode, Debug)] +pub struct LastConfirmedBlockRequest { + pub domain_id: DomainId, + pub block_hash: Option, +} + +#[derive(Clone, PartialEq, Encode, Decode, Debug)] +pub struct LastConfirmedBlockResponse { + pub last_confirmed_block_receipt: ExecutionReceiptFor, +} + +/// Handler for incoming block requests from a remote peer. +pub struct LastDomainBlockERRequestHandler { + request_receiver: async_channel::Receiver, + + _phantom: PhantomData<(Block, DomainHeader)>, + + client: Arc, +} + +impl LastDomainBlockERRequestHandler +where + Block: BlockT, + Client: ProvideRuntimeApi + + BlockBackend + + ProofProvider + + HeaderBackend + + Send + + Sync + + 'static, + Client::Api: DomainsApi + FraudProofApi, + DomainHeader: Header, +{ + /// Create a new [`LastDomainBlockERRequestHandler`]. + pub fn new( + fork_id: Option<&str>, + client: Arc, + num_peer_hint: usize, + ) -> (Self, NB::RequestResponseProtocolConfig) + where + NB: NetworkBackend::Hash>, + { + // Reserve enough request slots for one request per peer when we are at the maximum + // number of peers. + let capacity = std::cmp::max(num_peer_hint, 1); + let (tx, request_receiver) = async_channel::bounded(capacity); + + let protocol_config = generate_protocol_config::<_, Block, NB>( + client + .block_hash(0u32.into()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + fork_id, + tx, + ); + + ( + Self { + request_receiver, + client, + _phantom: PhantomData, + }, + protocol_config, + ) + } + + /// Run [`StateRequestHandler`]. + pub async fn run(mut self) { + while let Some(request) = self.request_receiver.next().await { + let IncomingRequest { + peer, + payload, + pending_response, + } = request; + + match self.handle_request(payload, pending_response, &peer) { + Ok(()) => debug!("Handled domain block info request from {}.", peer), + Err(e) => error!( + "Failed to handle domain block info request from {}: {}", + peer, e, + ), + } + } + } + + fn handle_request( + &mut self, + payload: Vec, + pending_response: oneshot::Sender, + peer: &PeerId, + ) -> Result<(), HandleRequestError> { + let request = LastConfirmedBlockRequest::::decode(&mut payload.as_slice())?; + + trace!("Handle last confirmed domain block info request: {peer}, request: {request:?}",); + + let result = { + let target_block_hash = if let Some(block_hash) = request.block_hash { + block_hash + } else { + let info = self.client.info(); + info.best_hash + }; + + let consensus_api_version = self + .client + .runtime_api() + .api_version::>(target_block_hash) + .map_err(sp_blockchain::Error::RuntimeApiError)? + .ok_or_else(|| HandleRequestError::ApiVersionNotSupported)?; + + if consensus_api_version < 6 { + debug!("Incorrect API version to support the last confirmed block request: {consensus_api_version}"); + + return Err(HandleRequestError::ApiVersionNotSupported); + } + + let last_confirmed_block_receipt = self + .client + .runtime_api() + .last_confirmed_domain_block_receipt(target_block_hash, request.domain_id); + + debug!( + ?last_confirmed_block_receipt, + "Last confirmed domain block receipt." + ); + + let response = match last_confirmed_block_receipt { + Ok(Some(last_confirmed_block_receipt)) => { + LastConfirmedBlockResponse:: { + last_confirmed_block_receipt, + } + } + Ok(None) => { + debug!( + domain_id=%request.domain_id, + %target_block_hash, + "Last confirmed domain block acquisition failed: no data.", + ); + + return Err(HandleRequestError::AbsentLastConfirmedDomainBlockData); + } + Err(err) => { + debug!( + domain_id=%request.domain_id, + %target_block_hash, + ?err, + "Last confirmed domain block acquisition failed.", + ); + + return Err(HandleRequestError::LastConfirmedDomainDataAcquisitionFailed(err)); + } + }; + + Ok(response.encode()) + }; + + pending_response + .send(OutgoingResponse { + result, + reputation_changes: Vec::new(), + sent_feedback: None, + }) + .map_err(|_| HandleRequestError::SendResponse) + } +} + +#[derive(Debug, thiserror::Error)] +enum HandleRequestError { + #[error(transparent)] + Client(#[from] sp_blockchain::Error), + + #[error("Failed to send response.")] + SendResponse, + + #[error("Api version is not supported.")] + ApiVersionNotSupported, + + #[error("Failed to decode request: {0}.")] + Decode(#[from] codec::Error), + + #[error("Last confirmed domain block acquisition failed: no data.")] + AbsentLastConfirmedDomainBlockData, + + #[error("Last confirmed domain block acquisition failed: no data.")] + LastConfirmedDomainDataAcquisitionFailed(#[from] sp_api::ApiError), +} diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index b56a2a03c4..bff7d6777f 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -27,6 +27,7 @@ )] pub mod config; +pub(crate) mod domains; pub mod dsn; mod metrics; pub(crate) mod mmr; @@ -36,6 +37,7 @@ mod task_spawner; pub mod transaction_pool; use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking}; +use crate::domains::request_handler::LastDomainBlockERRequestHandler; use crate::dsn::{create_dsn_instance, DsnConfigurationError}; use crate::metrics::NodeMetrics; use crate::mmr::request_handler::MmrRequestHandler; @@ -880,14 +882,14 @@ where net_config.add_notification_protocol(pot_gossip_notification_config); let pause_sync = Arc::clone(&net_config.network_config.pause_sync); - if let Some(offchain_storage) = backend.offchain_storage() { - let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize - + net_config - .network_config - .default_peers_set - .reserved_nodes - .len(); + let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize + + net_config + .network_config + .default_peers_set + .reserved_nodes + .len(); + if let Some(offchain_storage) = backend.offchain_storage() { // Allow both outgoing and incoming requests. let (handler, protocol_config) = MmrRequestHandler::new::::Hash>, _>( @@ -904,6 +906,22 @@ where net_config.add_request_response_protocol(protocol_config); } + // "Last confirmed domain block execution receipt" request handler + { + let (handler, protocol_config) = LastDomainBlockERRequestHandler::new::< + NetworkWorker::Hash>, + >( + fork_id.as_deref(), client.clone(), num_peer_hint + ); + task_manager.spawn_handle().spawn( + "last-domain-execution-receipt-request-handler", + Some("networking"), + handler.run(), + ); + + net_config.add_request_response_protocol(protocol_config); + } + let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config.base, diff --git a/domains/service/src/config.rs b/domains/service/src/config.rs index 3f00477d4e..6f250d4443 100644 --- a/domains/service/src/config.rs +++ b/domains/service/src/config.rs @@ -147,7 +147,9 @@ impl From for Configuration { max_blocks_per_request: 64, // Substrate's default, full mode sync_mode: SyncMode::Full, - pause_sync: Arc::new(AtomicBool::new(false)), + // Disable syncing engine because domain blocks are only created from local + // consensus blocks, not synced blocks from remote peers. + pause_sync: Arc::new(AtomicBool::new(true)), // Substrate's default enable_dht_random_walk: true, // Substrate's default diff --git a/domains/service/src/domain.rs b/domains/service/src/domain.rs index b5ee2cdcf2..55c521dc66 100644 --- a/domains/service/src/domain.rs +++ b/domains/service/src/domain.rs @@ -368,25 +368,31 @@ where let mut task_manager = params.task_manager; let net_config = sc_network::config::FullNetworkConfiguration::new(&domain_config.network); - let (network_service, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = - crate::build_network(BuildNetworkParams { - config: &domain_config, - net_config, - client: client.clone(), - transaction_pool: transaction_pool.clone(), - spawn_handle: task_manager.spawn_handle(), - import_queue: params.import_queue, - // TODO: we might want to re-enable this some day. - block_announce_validator_builder: None, - warp_sync_params: None, - block_relay: None, - metrics: NotificationMetrics::new( - domain_config - .prometheus_config - .as_ref() - .map(|cfg| &cfg.registry), - ), - })?; + let ( + network_service, + system_rpc_tx, + tx_handler_controller, + network_starter, + sync_service, + _block_downloader, + ) = crate::build_network(BuildNetworkParams { + config: &domain_config, + net_config, + client: client.clone(), + transaction_pool: transaction_pool.clone(), + spawn_handle: task_manager.spawn_handle(), + import_queue: params.import_queue, + // TODO: we might want to re-enable this some day. + block_announce_validator_builder: None, + warp_sync_params: None, + block_relay: None, + metrics: NotificationMetrics::new( + domain_config + .prometheus_config + .as_ref() + .map(|cfg| &cfg.registry), + ), + })?; let is_authority = domain_config.role.is_authority(); domain_config.rpc_id_provider = provider.rpc_id(); diff --git a/domains/service/src/lib.rs b/domains/service/src/lib.rs index e9457a8337..752f17d8de 100644 --- a/domains/service/src/lib.rs +++ b/domains/service/src/lib.rs @@ -13,6 +13,7 @@ use sc_consensus::ImportQueue; use sc_domains::RuntimeExecutor; use sc_network::config::Roles; use sc_network::{NetworkService, NetworkWorker}; +use sc_network_sync::block_relay_protocol::BlockDownloader; use sc_network_sync::block_request_handler::BlockRequestHandler; use sc_network_sync::engine::SyncingEngine; use sc_network_sync::service::network::NetworkServiceProvider; @@ -56,6 +57,7 @@ pub fn build_network( sc_network_transactions::TransactionsHandlerController<::Hash>, NetworkStarter, Arc>, + Arc>, ), sc_service::Error, > @@ -104,8 +106,12 @@ where .expect("Genesis block exists; qed"); let (chain_sync_network_provider, chain_sync_network_handle) = NetworkServiceProvider::new(); - let (mut block_server, block_downloader) = match block_relay { - Some(params) => (params.server, params.downloader), + let (mut block_server, block_downloader, block_request_protocol_config) = match block_relay { + Some(params) => ( + params.server, + params.downloader, + params.request_response_config, + ), None => { // Custom protocol was not specified, use the default block handler. let params = BlockRequestHandler::new::::Hash>>( @@ -116,7 +122,11 @@ where config.network.default_peers_set.in_peers as usize + config.network.default_peers_set.out_peers as usize, ); - (params.server, params.downloader) + ( + params.server, + params.downloader, + params.request_response_config, + ) } }; spawn_handle.spawn("block-request-handler", Some("networking"), async move { @@ -156,6 +166,9 @@ where protocol_config }; + net_config.add_request_response_protocol(block_request_protocol_config); + net_config.add_request_response_protocol(state_request_protocol_config.clone()); + let (engine, sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config.role), client.clone(), @@ -172,7 +185,7 @@ where None, chain_sync_network_handle, import_queue.service(), - block_downloader, + block_downloader.clone(), state_request_protocol_config.name.clone(), None, peer_store_handle, @@ -304,6 +317,7 @@ where tx_handler_controller, NetworkStarter::new(network_start_tx), sync_service, + block_downloader, )) } diff --git a/test/subspace-test-runtime/src/lib.rs b/test/subspace-test-runtime/src/lib.rs index 59d5b9cfae..67899a3aef 100644 --- a/test/subspace-test-runtime/src/lib.rs +++ b/test/subspace-test-runtime/src/lib.rs @@ -1479,6 +1479,10 @@ impl_runtime_apis! { fn domain_sudo_call(domain_id: DomainId) -> Option> { Domains::domain_sudo_call(domain_id) } + + fn last_confirmed_domain_block_receipt(domain_id: DomainId) -> Option>{ + Domains::latest_confirmed_domain_execution_receipt(domain_id) + } } impl sp_domains::BundleProducerElectionApi for Runtime {