diff --git a/crates/sp-domains/src/bundle_producer_election.rs b/crates/sp-domains/src/bundle_producer_election.rs index f9e764c907..f1b94e6fa8 100644 --- a/crates/sp-domains/src/bundle_producer_election.rs +++ b/crates/sp-domains/src/bundle_producer_election.rs @@ -54,6 +54,9 @@ pub fn is_below_threshold(vrf_output: &VrfOutput, threshold: u128) -> bool { #[derive(Debug, Decode, Encode, TypeInfo, PartialEq, Eq, Clone)] pub struct BundleProducerElectionParams { + // TODO: current operators is not required anymore. + // This is not removed right now in order to not break runtime api with new version + // marking a todo to remove it before next network. pub current_operators: Vec, pub total_domain_stake: Balance, pub bundle_slot_probability: (u64, u64), diff --git a/crates/subspace-node/src/domain/cli.rs b/crates/subspace-node/src/domain/cli.rs index ce34635dd8..a9261c9beb 100644 --- a/crates/subspace-node/src/domain/cli.rs +++ b/crates/subspace-node/src/domain/cli.rs @@ -29,7 +29,7 @@ use sc_service::{BasePath, Configuration}; use sp_blockchain::HeaderBackend; use sp_domain_digests::AsPredigest; use sp_domains::storage::RawGenesis; -use sp_domains::DomainId; +use sp_domains::{DomainId, OperatorId}; use sp_runtime::generic::BlockId; use sp_runtime::traits::Header; use sp_runtime::{BuildStorage, DigestItem}; @@ -64,6 +64,10 @@ fn parse_domain_id(s: &str) -> std::result::Result { s.parse::().map(Into::into) } +fn parse_operator_id(s: &str) -> std::result::Result { + s.parse::().map(OperatorId::from) +} + #[derive(Debug, Parser)] pub struct DomainCli { /// Run a domain node. @@ -73,9 +77,9 @@ pub struct DomainCli { #[clap(long, value_parser = parse_domain_id)] pub domain_id: DomainId, - /// Run the node as an Operator - #[arg(long, conflicts_with = "validator")] - pub operator: bool, + /// Use provider operator id to submit bundles. + #[arg(long, value_parser = parse_operator_id)] + pub operator_id: Option, /// Additional args for domain. #[clap(raw = true)] @@ -214,9 +218,15 @@ impl CliConfiguration for DomainCli { self.run.chain_id(is_dev) } - fn role(&self, is_dev: bool) -> Result { - // is authority when operator is enabled or in dev mode - let is_authority = self.operator || self.run.validator || is_dev; + fn role(&self, _is_dev: bool) -> Result { + if self.run.validator { + return Err(sc_cli::Error::Input( + "use `--operator-id` argument to run as operator".to_string(), + )); + } + + // is authority when operator_id is passed. + let is_authority = self.operator_id.is_some(); Ok(if is_authority { Role::Authority diff --git a/crates/subspace-node/src/domain/domain_instance_starter.rs b/crates/subspace-node/src/domain/domain_instance_starter.rs index fab39abe56..4307e237ba 100644 --- a/crates/subspace-node/src/domain/domain_instance_starter.rs +++ b/crates/subspace-node/src/domain/domain_instance_starter.rs @@ -138,6 +138,7 @@ impl DomainInstanceStarter { domain_message_receiver, provider: eth_provider, skip_empty_bundle_production: true, + maybe_operator_id: domain_cli.operator_id, }; let mut domain_node = domain_service::new_full::< diff --git a/domains/client/domain-operator/src/bundle_producer_election_solver.rs b/domains/client/domain-operator/src/bundle_producer_election_solver.rs index a1fc8ca1ca..c54da1c98e 100644 --- a/domains/client/domain-operator/src/bundle_producer_election_solver.rs +++ b/domains/client/domain-operator/src/bundle_producer_election_solver.rs @@ -1,9 +1,13 @@ use sp_api::ProvideRuntimeApi; use sp_consensus_slots::Slot; +use sp_core::bytes::to_hex; +use sp_core::ByteArray; use sp_domains::bundle_producer_election::{ calculate_threshold, is_below_threshold, make_transcript, BundleProducerElectionParams, }; -use sp_domains::{BundleProducerElectionApi, DomainId, OperatorPublicKey, ProofOfElection}; +use sp_domains::{ + BundleProducerElectionApi, DomainId, OperatorId, OperatorPublicKey, ProofOfElection, +}; use sp_keystore::{Keystore, KeystorePtr}; use sp_runtime::traits::Block as BlockT; use sp_runtime::RuntimeAppPublic; @@ -11,6 +15,7 @@ use std::marker::PhantomData; use std::sync::Arc; use subspace_core_primitives::Randomness; use subspace_runtime_primitives::Balance; +use tracing::log; pub(super) struct BundleProducerElectionSolver { keystore: KeystorePtr, @@ -48,57 +53,69 @@ where slot: Slot, consensus_block_hash: CBlock::Hash, domain_id: DomainId, + maybe_operator_id: Option, global_randomness: Randomness, ) -> sp_blockchain::Result, OperatorPublicKey)>> { - let BundleProducerElectionParams { - current_operators, - total_domain_stake, - bundle_slot_probability, - } = match self - .consensus_client - .runtime_api() - .bundle_producer_election_params(consensus_block_hash, domain_id)? - { - Some(params) => params, - None => return Ok(None), - }; + if let Some(operator_id) = maybe_operator_id { + let BundleProducerElectionParams { + total_domain_stake, + bundle_slot_probability, + .. + } = match self + .consensus_client + .runtime_api() + .bundle_producer_election_params(consensus_block_hash, domain_id)? + { + Some(params) => params, + None => return Ok(None), + }; - let global_challenge = global_randomness.derive_global_challenge(slot.into()); - let vrf_sign_data = make_transcript(domain_id, &global_challenge).into_sign_data(); + let global_challenge = global_randomness.derive_global_challenge(slot.into()); + let vrf_sign_data = make_transcript(domain_id, &global_challenge).into_sign_data(); - // TODO: The runtime API may take 10~20 microseonds each time, looping the operator set - // could take too long for the bundle production, track a mapping of signing_key to - // operator_id in the runtime and then we can update it to loop the keys in the keystore. - for operator_id in current_operators { + // Ideally, we can already cache operator signing key since we do not allow changing such key + // in the protocol right now. Leaving this as is since we anyway need to need to fetch operator's + // latest stake and this also returns the signing key with it. if let Some((operator_signing_key, operator_stake)) = self .consensus_client .runtime_api() .operator(consensus_block_hash, operator_id)? { - if let Ok(Some(vrf_signature)) = Keystore::sr25519_vrf_sign( + if let Ok(maybe_vrf_signature) = Keystore::sr25519_vrf_sign( &*self.keystore, OperatorPublicKey::ID, &operator_signing_key.clone().into(), &vrf_sign_data, ) { - let threshold = calculate_threshold( - operator_stake, - total_domain_stake, - bundle_slot_probability, - ); + if let Some(vrf_signature) = maybe_vrf_signature { + let threshold = calculate_threshold( + operator_stake, + total_domain_stake, + bundle_slot_probability, + ); - if is_below_threshold(&vrf_signature.output, threshold) { - let proof_of_election = ProofOfElection { - domain_id, - slot_number: slot.into(), - global_randomness, - vrf_signature, - operator_id, - consensus_block_hash, - }; - return Ok(Some((proof_of_election, operator_signing_key))); + if is_below_threshold(&vrf_signature.output, threshold) { + let proof_of_election = ProofOfElection { + domain_id, + slot_number: slot.into(), + global_randomness, + vrf_signature, + operator_id, + consensus_block_hash, + }; + return Ok(Some((proof_of_election, operator_signing_key))); + } + } else { + log::warn!( + "Operator[{operator_id}]'s Signing key[{}] pair is not available in keystore.", + to_hex(operator_signing_key.as_slice(), false) + ); + return Ok(None); } } + } else { + log::warn!("Operator[{operator_id}] is not registered on the Runtime",); + return Ok(None); } } diff --git a/domains/client/domain-operator/src/domain_bundle_producer.rs b/domains/client/domain-operator/src/domain_bundle_producer.rs index 6c66420006..ff6b0ee3a5 100644 --- a/domains/client/domain-operator/src/domain_bundle_producer.rs +++ b/domains/client/domain-operator/src/domain_bundle_producer.rs @@ -9,8 +9,8 @@ use sp_api::{NumberFor, ProvideRuntimeApi}; use sp_block_builder::BlockBuilder; use sp_blockchain::{HashAndNumber, HeaderBackend}; use sp_domains::{ - Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorPublicKey, OperatorSignature, - SealedBundleHeader, + Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey, + OperatorSignature, SealedBundleHeader, }; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Zero}; @@ -34,6 +34,7 @@ where CBlock: BlockT, { domain_id: DomainId, + maybe_operator_id: Option, consensus_client: Arc, client: Arc, bundle_sender: Arc>, @@ -52,6 +53,7 @@ where fn clone(&self) -> Self { Self { domain_id: self.domain_id, + maybe_operator_id: self.maybe_operator_id, consensus_client: self.consensus_client.clone(), client: self.client.clone(), bundle_sender: self.bundle_sender.clone(), @@ -79,6 +81,7 @@ where #[allow(clippy::too_many_arguments)] pub(super) fn new( domain_id: DomainId, + maybe_operator_id: Option, consensus_client: Arc, client: Arc, domain_bundle_proposer: DomainBundleProposer< @@ -98,6 +101,7 @@ where ); Self { domain_id, + maybe_operator_id, consensus_client, client, bundle_sender, @@ -146,6 +150,7 @@ where slot, consensus_block_info.hash, self.domain_id, + self.maybe_operator_id, global_randomness, )? { diff --git a/domains/client/domain-operator/src/domain_worker_starter.rs b/domains/client/domain-operator/src/domain_worker_starter.rs index 6639f4d192..10933e695e 100644 --- a/domains/client/domain-operator/src/domain_worker_starter.rs +++ b/domains/client/domain-operator/src/domain_worker_starter.rs @@ -31,7 +31,7 @@ use sp_block_builder::BlockBuilder; use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_core::traits::{CodeExecutor, SpawnEssentialNamed}; use sp_core::H256; -use sp_domains::{BundleProducerElectionApi, DomainsApi}; +use sp_domains::{BundleProducerElectionApi, DomainsApi, OperatorId}; use sp_domains_fraud_proof::FraudProofApi; use sp_messenger::MessengerApi; use sp_runtime::traits::NumberFor; @@ -57,7 +57,7 @@ pub(super) async fn start_worker< spawn_essential: Box, consensus_client: Arc, consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, - is_authority: bool, + maybe_operator_id: Option, bundle_producer: DomainBundleProducer, bundle_processor: BundleProcessor, operator_streams: OperatorStreams, @@ -118,27 +118,8 @@ pub(super) async fn start_worker< consensus_block_import_throttling_buffer_size, ); - if !is_authority { - info!("🧑‍ Running as Full node..."); - drop(new_slot_notification_stream); - drop(acknowledgement_sender_stream); - while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await { - if let Some(block_info) = maybe_block_info { - if let Err(error) = bundle_processor - .clone() - .process_bundles((block_info.hash, block_info.number, block_info.is_new_best)) - .instrument(span.clone()) - .await - { - tracing::error!(?error, "Failed to process consensus block"); - // Bring down the service as bundles processor is an essential task. - // TODO: more graceful shutdown. - break; - } - } - } - } else { - info!("🧑‍🌾 Running as Operator..."); + if let Some(operator_id) = maybe_operator_id { + info!("👷 Running as Operator[{operator_id}]..."); let bundler_fn = { let span = span.clone(); move |consensus_block_info: sp_blockchain::HashAndNumber, slot_info| { @@ -215,5 +196,24 @@ pub(super) async fn start_worker< } } } + } else { + info!("🧑‍ Running as Full node..."); + drop(new_slot_notification_stream); + drop(acknowledgement_sender_stream); + while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await { + if let Some(block_info) = maybe_block_info { + if let Err(error) = bundle_processor + .clone() + .process_bundles((block_info.hash, block_info.number, block_info.is_new_best)) + .instrument(span.clone()) + .await + { + tracing::error!(?error, "Failed to process consensus block"); + // Bring down the service as bundles processor is an essential task. + // TODO: more graceful shutdown. + break; + } + } + } } } diff --git a/domains/client/domain-operator/src/lib.rs b/domains/client/domain-operator/src/lib.rs index 076feda129..903889ad84 100644 --- a/domains/client/domain-operator/src/lib.rs +++ b/domains/client/domain-operator/src/lib.rs @@ -90,7 +90,7 @@ use sp_blockchain::HeaderBackend; use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use sp_domain_digests::AsPredigest; -use sp_domains::{Bundle, DomainId, ExecutionReceipt}; +use sp_domains::{Bundle, DomainId, ExecutionReceipt, OperatorId}; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use sp_runtime::DigestItem; @@ -169,7 +169,7 @@ pub struct OperatorParams< pub transaction_pool: Arc, pub backend: Arc, pub code_executor: Arc, - pub is_authority: bool, + pub maybe_operator_id: Option, pub keystore: KeystorePtr, pub bundle_sender: Arc>, pub operator_streams: OperatorStreams, diff --git a/domains/client/domain-operator/src/operator.rs b/domains/client/domain-operator/src/operator.rs index eb112cd3de..6f2858a9c4 100644 --- a/domains/client/domain-operator/src/operator.rs +++ b/domains/client/domain-operator/src/operator.rs @@ -126,6 +126,7 @@ where let bundle_producer = DomainBundleProducer::new( params.domain_id, + params.maybe_operator_id, params.consensus_client.clone(), params.client.clone(), domain_bundle_proposer, @@ -179,7 +180,7 @@ where spawn_essential.clone(), params.consensus_client.clone(), params.consensus_offchain_tx_pool_factory.clone(), - params.is_authority, + params.maybe_operator_id, bundle_producer, bundle_processor.clone(), params.operator_streams, diff --git a/domains/service/src/domain.rs b/domains/service/src/domain.rs index 5100a9bce4..d26560f1c9 100644 --- a/domains/service/src/domain.rs +++ b/domains/service/src/domain.rs @@ -28,7 +28,7 @@ use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use sp_core::traits::SpawnEssentialNamed; use sp_core::{Decode, Encode}; -use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi}; +use sp_domains::{BundleProducerElectionApi, DomainId, DomainsApi, OperatorId}; use sp_domains_fraud_proof::FraudProofApi; use sp_messenger::messages::ChainId; use sp_messenger::{MessengerApi, RelayerApi}; @@ -223,6 +223,7 @@ where pub domain_id: DomainId, pub domain_config: ServiceConfiguration, pub domain_created_at: NumberFor, + pub maybe_operator_id: Option, pub consensus_client: Arc, pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, pub consensus_network_sync_oracle: Arc, @@ -324,6 +325,7 @@ where { let DomainParams { domain_id, + maybe_operator_id, mut domain_config, domain_created_at, consensus_client, @@ -450,7 +452,7 @@ where transaction_pool: transaction_pool.clone(), backend: backend.clone(), code_executor: code_executor.clone(), - is_authority, + maybe_operator_id, keystore: params.keystore_container.keystore(), bundle_sender: Arc::new(bundle_sender), operator_streams, diff --git a/domains/test/service/src/domain.rs b/domains/test/service/src/domain.rs index 18d8f916ab..eed107a19d 100644 --- a/domains/test/service/src/domain.rs +++ b/domains/test/service/src/domain.rs @@ -221,6 +221,8 @@ where .xdm_gossip_worker_builder() .gossip_msg_sink(); + let maybe_operator_id = role.is_authority().then_some(0); + let domain_params = domain_service::DomainParams { domain_id, domain_config, @@ -235,6 +237,7 @@ where domain_message_receiver, provider: DefaultProvider, skip_empty_bundle_production, + maybe_operator_id, }; let domain_node = domain_service::new_full::<