From 3685d2c04a5c522a4cedd1294ea9b41e2bc6f17c Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 3 Dec 2024 12:45:11 +1000 Subject: [PATCH] Split produce_bundle into smaller methods --- .../src/domain_bundle_producer.rs | 284 +++++++++++++----- 1 file changed, 204 insertions(+), 80 deletions(-) diff --git a/domains/client/domain-operator/src/domain_bundle_producer.rs b/domains/client/domain-operator/src/domain_bundle_producer.rs index f610b26e48..06261a6016 100644 --- a/domains/client/domain-operator/src/domain_bundle_producer.rs +++ b/domains/client/domain-operator/src/domain_bundle_producer.rs @@ -10,8 +10,9 @@ use sp_blockchain::HeaderBackend; use sp_consensus_slots::Slot; use sp_domains::core_api::DomainCoreApi; use sp_domains::{ - Bundle, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, OperatorPublicKey, - OperatorSignature, SealedBundleHeader, SealedSingletonReceipt, SingletonReceipt, + Bundle, BundleHeader, BundleProducerElectionApi, DomainId, DomainsApi, OperatorId, + OperatorPublicKey, OperatorSignature, ProofOfElection, SealedBundleHeader, + SealedSingletonReceipt, SingletonReceipt, }; use sp_keystore::KeystorePtr; use sp_messenger::MessengerApi; @@ -22,6 +23,19 @@ use std::sync::Arc; use subspace_runtime_primitives::Balance; use tracing::info; +/// Type alias for block hash. +pub type BlockHashFor = ::Hash; + +/// Type alias for block header. +pub type HeaderFor = ::Header; + +/// Type alias for bundle header. +pub type BundleHeaderFor = + BundleHeader, BlockHashFor, HeaderFor, Balance>; + +/// Type alias for extrinsics. +pub type ExtrinsicFor = ::Extrinsic; + type OpaqueBundle = sp_domains::OpaqueBundle< NumberFor, ::Hash, @@ -170,18 +184,26 @@ where }) } - pub async fn produce_bundle( - &mut self, + #[expect(clippy::type_complexity)] + pub fn claim_bundle_slot( + &self, operator_id: OperatorId, - slot_info: OperatorSlotInfo, - ) -> sp_blockchain::Result>> { + slot_info: &OperatorSlotInfo, + domain_best_number: NumberFor, + consensus_chain_best_hash: BlockHashFor, + ) -> sp_blockchain::Result< + Option<( + NumberFor, + NumberFor, + ProofOfElection, + OperatorPublicKey, + )>, + > { let OperatorSlotInfo { slot, proof_of_time, } = slot_info; - let domain_best_number = self.client.info().best_number; - let consensus_chain_best_hash = self.consensus_client.info().best_hash; let domain_best_number_onchain = self .consensus_client .runtime_api() @@ -211,7 +233,7 @@ where let skip_out_of_order_slot = self.skip_out_of_order_slot && self .last_processed_slot - .map(|last_slot| last_slot >= slot) + .map(|last_slot| last_slot >= *slot) .unwrap_or(false); is_operator_lagging || skip_out_of_order_slot @@ -227,98 +249,200 @@ where if let Some((proof_of_election, operator_signing_key)) = self.bundle_producer_election_solver.solve_challenge( - slot, + *slot, consensus_chain_best_hash, self.domain_id, operator_id, - proof_of_time, + *proof_of_time, )? { tracing::info!("📦 Claimed slot {slot}"); + Ok(Some(( + domain_best_number_onchain, + head_receipt_number, + proof_of_election, + operator_signing_key, + ))) + } else { + Ok(None) + } + } + + pub fn prepare_receipt( + &self, + slot_info: &OperatorSlotInfo, + domain_best_number_onchain: NumberFor, + head_receipt_number: NumberFor, + proof_of_election: &ProofOfElection, + operator_signing_key: &OperatorPublicKey, + ) -> sp_blockchain::Result>> { + // When the receipt gap is greater than one, the operator needs to produce a receipt + // instead of a bundle + if domain_best_number_onchain.saturating_sub(head_receipt_number) > 1u32.into() { + info!( + ?domain_best_number_onchain, + ?head_receipt_number, + "🔖 Producing singleton receipt at slot {:?}", + slot_info.slot + ); + let receipt = self .domain_bundle_proposer .load_next_receipt(domain_best_number_onchain, head_receipt_number)?; - // When the receipt gap is greater than one the operator need to produce receipt - // instead of bundle - if domain_best_number_onchain.saturating_sub(head_receipt_number) > 1u32.into() { - info!( - ?domain_best_number_onchain, - ?head_receipt_number, - "🔖 Producing singleton receipt at slot {:?}", - slot_info.slot - ); - - let singleton_receipt = SingletonReceipt { - proof_of_election, - receipt, - }; + let singleton_receipt = SingletonReceipt { + proof_of_election: proof_of_election.clone(), + receipt, + }; + + let signature = { + let to_sign: BlockHashFor = singleton_receipt.hash(); + self.sign(operator_signing_key, to_sign.as_ref())? + }; - let signature = { - let to_sign: ::Hash = singleton_receipt.hash(); - self.sign(&operator_signing_key, to_sign.as_ref())? + let sealed_singleton_receipt: SealedSingletonReceiptFor = + SealedSingletonReceipt { + singleton_receipt, + signature, }; - let sealed_singleton_receipt: SealedSingletonReceiptFor = - SealedSingletonReceipt { - singleton_receipt, - signature, - }; - return Ok(Some(DomainProposal::Receipt(sealed_singleton_receipt))); - } + Ok(Some(DomainProposal::Receipt(sealed_singleton_receipt))) + } else { + Ok(None) + } + } + + #[expect(clippy::too_many_arguments)] + pub async fn prepare_bundle( + &mut self, + operator_id: OperatorId, + slot_info: &OperatorSlotInfo, + consensus_chain_best_hash: BlockHashFor, + domain_best_number_onchain: NumberFor, + head_receipt_number: NumberFor, + proof_of_election: ProofOfElection, + // TODO: remove when skip_empty_bundle_production is split out + domain_best_number: NumberFor, + // TODO: remove Option when skip_empty_bundle_production is split out + ) -> sp_blockchain::Result, Vec>)>> + { + let tx_range = self + .consensus_client + .runtime_api() + .domain_tx_range(consensus_chain_best_hash, self.domain_id) + .map_err(|error| { + sp_blockchain::Error::Application(Box::from(format!( + "Error getting tx range: {error}" + ))) + })?; + + let receipt = self + .domain_bundle_proposer + .load_next_receipt(domain_best_number_onchain, head_receipt_number)?; - let tx_range = self + let (bundle_header, extrinsics) = self + .domain_bundle_proposer + .propose_bundle_at(proof_of_election.clone(), tx_range, operator_id, receipt) + .await?; + + // if there are no extrinsics and no receipts to confirm, skip the bundle + if self.skip_empty_bundle_production + && extrinsics.is_empty() + && !self .consensus_client .runtime_api() - .domain_tx_range(consensus_chain_best_hash, self.domain_id) - .map_err(|error| { - sp_blockchain::Error::Application(Box::from(format!( - "Error getting tx range: {error}" - ))) - })?; - let (bundle_header, extrinsics) = self - .domain_bundle_proposer - .propose_bundle_at(proof_of_election, tx_range, operator_id, receipt) - .await?; - - // if there are no extrinsics and no receipts to confirm, skip the bundle - if self.skip_empty_bundle_production - && extrinsics.is_empty() - && !self - .consensus_client - .runtime_api() - .non_empty_er_exists(consensus_chain_best_hash, self.domain_id)? - { - tracing::warn!( - ?domain_best_number, - "Skipping empty bundle production on slot {slot}" - ); - return Ok(None); - } - - self.last_processed_slot.replace(slot); - - info!("🔖 Producing bundle at slot {:?}", slot_info.slot); + .non_empty_er_exists(consensus_chain_best_hash, self.domain_id)? + { + tracing::warn!( + ?domain_best_number, + "Skipping empty bundle production on slot {}", + slot_info.slot, + ); + return Ok(None); + } - let signature = { - let to_sign = bundle_header.hash(); - self.sign(&operator_signing_key, to_sign.as_ref())? - }; + self.last_processed_slot.replace(slot_info.slot); - let bundle = Bundle { - sealed_header: SealedBundleHeader::new(bundle_header, signature), - extrinsics, - }; + Ok(Some((bundle_header, extrinsics))) + } - // TODO: Re-enable the bundle gossip over X-Net when the compact bundle is supported. - // if let Err(e) = self.bundle_sender.unbounded_send(signed_bundle.clone()) { - // tracing::error!(error = ?e, "Failed to send transaction bundle"); - // } + pub fn seal_bundle( + &self, + bundle_header: BundleHeaderFor, + operator_signing_key: &OperatorPublicKey, + extrinsics: Vec>, + ) -> sp_blockchain::Result> { + let signature = { + let to_sign = bundle_header.hash(); + self.sign(operator_signing_key, to_sign.as_ref())? + }; - Ok(Some(DomainProposal::Bundle(bundle.into_opaque_bundle()))) - } else { - Ok(None) + let bundle = Bundle { + sealed_header: SealedBundleHeader::new(bundle_header, signature), + extrinsics, + }; + + // TODO: Re-enable the bundle gossip over X-Net when the compact bundle is supported. + // if let Err(e) = self.bundle_sender.unbounded_send(signed_bundle.clone()) { + // tracing::error!(error = ?e, "Failed to send transaction bundle"); + // } + + Ok(DomainProposal::Bundle(bundle.into_opaque_bundle())) + } + + pub async fn produce_bundle( + &mut self, + operator_id: OperatorId, + slot_info: OperatorSlotInfo, + ) -> sp_blockchain::Result>> { + let domain_best_number = self.client.info().best_number; + let consensus_chain_best_hash = self.consensus_client.info().best_hash; + + let Some(( + domain_best_number_onchain, + head_receipt_number, + proof_of_election, + operator_signing_key, + )) = self.claim_bundle_slot( + operator_id, + &slot_info, + domain_best_number, + consensus_chain_best_hash, + )? + else { + return Ok(None); + }; + + if let Some(receipt) = self.prepare_receipt( + &slot_info, + domain_best_number_onchain, + head_receipt_number, + &proof_of_election, + &operator_signing_key, + )? { + return Ok(Some(receipt)); } + + let Some((bundle_header, extrinsics)) = self + .prepare_bundle( + operator_id, + &slot_info, + consensus_chain_best_hash, + domain_best_number_onchain, + head_receipt_number, + proof_of_election, + domain_best_number, + ) + .await? + else { + return Ok(None); + }; + + info!("🔖 Producing bundle at slot {:?}", slot_info.slot); + + let bundle = self.seal_bundle(bundle_header, &operator_signing_key, extrinsics)?; + + Ok(Some(bundle)) } }