diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index b4507bec32fff..d2433e4a7b98d 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -2,6 +2,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::consensus_adapter::ConsensusOverloadChecker; use crate::execution_cache::ExecutionCacheTraitPointers; use crate::execution_cache::TransactionCacheRead; use crate::jsonrpc_index::CoinIndexKey2; @@ -142,7 +143,6 @@ use crate::authority::authority_store_pruner::{ use crate::authority::epoch_start_configuration::EpochStartConfigTrait; use crate::authority::epoch_start_configuration::EpochStartConfiguration; use crate::checkpoints::CheckpointStore; -use crate::consensus_adapter::ConsensusAdapter; use crate::epoch::committee_store::CommitteeStore; use crate::execution_cache::{ CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite, @@ -1017,19 +1017,21 @@ impl AuthorityState { } } + /// When Ok, returns None if the transaction has not been executed, and returns + /// (TransactionEffects, TransactionEvents) if the transaction has been executed. #[instrument(level = "trace", skip_all)] pub async fn handle_transaction_v2( &self, epoch_store: &Arc, transaction: VerifiedTransaction, - ) -> SuiResult> { + ) -> SuiResult> { let tx_digest = *transaction.digest(); debug!("handle_transaction_v2"); - // Ensure an idempotent answer. - let tx_status = self.get_transaction_status(&tx_digest, epoch_store)?; - if tx_status.is_some() { - return Ok(tx_status); + // Check if the transaction has already been executed. + let tx_output = self.get_transaction_output(&tx_digest)?; + if tx_output.is_some() { + return Ok(tx_output); } let _metrics_guard = self @@ -1059,10 +1061,7 @@ impl AuthorityState { // It happens frequently that while we are checking the validity of the transaction, it // has just been executed. // In that case, we could still return Ok to avoid showing confusing errors. - Err(e) => self - .get_transaction_status(&tx_digest, epoch_store)? - .ok_or(e) - .map(Some), + Err(e) => self.get_transaction_output(&tx_digest)?.ok_or(e).map(Some), } } @@ -1080,7 +1079,7 @@ impl AuthorityState { pub(crate) fn check_system_overload( &self, - consensus_adapter: &Arc, + consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized), tx_data: &SenderSignedData, do_authority_overload_check: bool, ) -> SuiResult { @@ -1094,9 +1093,11 @@ impl AuthorityState { .tap_err(|_| { self.update_overload_metrics("execution_pending"); })?; - consensus_adapter.check_consensus_overload().tap_err(|_| { - self.update_overload_metrics("consensus"); - })?; + consensus_overload_checker + .check_consensus_overload() + .tap_err(|_| { + self.update_overload_metrics("consensus"); + })?; Ok(()) } @@ -4166,6 +4167,27 @@ impl AuthorityState { .await; } + /// Gets the execution outputs of a transaction if they exist + #[instrument(level = "trace", skip_all)] + pub fn get_transaction_output( + &self, + transaction_digest: &TransactionDigest, + ) -> SuiResult> { + let effects = self + .get_transaction_cache_reader() + .get_executed_effects(transaction_digest)?; + if let Some(effects) = effects { + let events = if let Some(digest) = effects.events_digest() { + self.get_transaction_events(digest)? + } else { + TransactionEvents::default() + }; + Ok(Some((effects, events))) + } else { + Ok(None) + } + } + /// Make a status response for a transaction #[instrument(level = "trace", skip_all)] pub fn get_transaction_status( diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index 274677feb5d69..27887618e77f5 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -21,6 +21,7 @@ use sui_network::{ api::{Validator, ValidatorServer}, tonic, }; +use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}; use sui_types::messages_grpc::{ HandleCertificateRequestV3, HandleCertificateResponseV3, HandleTransactionResponseV2, }; @@ -42,10 +43,6 @@ use sui_types::{ CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2, }, }; -use sui_types::{ - messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}, - messages_grpc::TransactionStatus, -}; use tap::TapFallible; use tokio::task::JoinHandle; use tonic::metadata::{Ascii, MetadataValue}; @@ -405,6 +402,8 @@ impl ValidatorService { self.transaction(request).await } + // When making changes to this function, see if the changes should be applied to + // `handle_transaction_v2()` and `SuiTxValidator::vote_transaction()` as well. async fn handle_transaction( &self, request: tonic::Request, @@ -430,7 +429,7 @@ impl ValidatorService { // higher chance to succeed. let mut validator_pushback_error = None; let overload_check_res = state.check_system_overload( - &consensus_adapter, + &*consensus_adapter, transaction.data(), state.check_system_overload_at_signing(), ); @@ -511,7 +510,7 @@ impl ValidatorService { // Check system overload let overload_check_res = self.state.check_system_overload( - &consensus_adapter, + &*consensus_adapter, transaction.data(), state.check_system_overload_at_signing(), ); @@ -535,7 +534,7 @@ impl ValidatorService { let tx_digest = transaction.digest(); let span = error_span!("validator_state_process_tx_v2", ?tx_digest); - let tx_status = state + let tx_output = state .handle_transaction_v2(&epoch_store, transaction.clone()) .instrument(span) .await @@ -544,25 +543,18 @@ impl ValidatorService { metrics.num_rejected_tx_in_epoch_boundary.inc(); } })?; - if let Some(( - _sender_signed_data, - // TODO(fastpath): Suppress duplicate transaction submission in consensus - // adapter, if not already done. (If we get back `TransactionStatus::Signed`` - // here we still need to proceed with submission logic, because the previous - // RPC might have been dropped after signing but before submission.) - TransactionStatus::Executed(_sign_info, signed_effects, events), - )) = tx_status - { + // Fetch remaining fields if the transaction has been executed. + if let Some((effects, events)) = tx_output { let input_objects = include_input_objects - .then(|| state.get_transaction_input_objects(signed_effects.data())) + .then(|| state.get_transaction_input_objects(&effects)) .and_then(Result::ok); let output_objects = include_output_objects - .then(|| state.get_transaction_output_objects(signed_effects.data())) + .then(|| state.get_transaction_output_objects(&effects)) .and_then(Result::ok); return Ok(( tonic::Response::new(HandleTransactionResponseV2 { - effects: signed_effects.into_data(), + effects, events: include_events.then_some(events), input_objects, output_objects, @@ -683,7 +675,7 @@ impl ValidatorService { // Check system overload for certificate in &certificates { let overload_check_res = self.state.check_system_overload( - &self.consensus_adapter, + &*self.consensus_adapter, certificate.data(), self.state.check_system_overload_at_execution(), ); diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index e54eefca610d9..087107acd3d58 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -184,6 +184,11 @@ impl ConsensusAdapterMetrics { } } +/// An object that can be used to check if the consensus is overloaded. +pub trait ConsensusOverloadChecker: Sync + Send + 'static { + fn check_consensus_overload(&self) -> SuiResult; +} + #[mockall::automock] #[async_trait::async_trait] pub trait SubmitToConsensus: Sync + Send + 'static { @@ -193,6 +198,7 @@ pub trait SubmitToConsensus: Sync + Send + 'static { epoch_store: &Arc, ) -> SuiResult; } + /// Submit Sui certificates to the consensus. pub struct ConsensusAdapter { /// The network client connecting to the consensus node of this authority. @@ -576,7 +582,7 @@ impl ConsensusAdapter { /// Performs weakly consistent checks on internal buffers to quickly /// discard transactions if we are overloaded - pub fn check_limits(&self) -> bool { + fn check_limits(&self) -> bool { // First check total transactions (waiting and in submission) if self.num_inflight_transactions.load(Ordering::Relaxed) as usize > self.max_pending_transactions @@ -587,14 +593,6 @@ impl ConsensusAdapter { self.submit_semaphore.available_permits() > 0 } - pub(crate) fn check_consensus_overload(&self) -> SuiResult { - fp_ensure!( - self.check_limits(), - SuiError::TooManyTransactionsPendingConsensus - ); - Ok(()) - } - fn submit_unchecked( self: &Arc, transactions: &[ConsensusTransaction], @@ -978,6 +976,24 @@ pub fn get_position_in_list( .0 } +impl ConsensusOverloadChecker for ConsensusAdapter { + fn check_consensus_overload(&self) -> SuiResult { + fp_ensure!( + self.check_limits(), + SuiError::TooManyTransactionsPendingConsensus + ); + Ok(()) + } +} + +pub struct NoopConsensusOverloadChecker {} + +impl ConsensusOverloadChecker for NoopConsensusOverloadChecker { + fn check_consensus_overload(&self) -> SuiResult { + Ok(()) + } +} + impl ReconfigurationInitiator for Arc { /// This method is called externally to begin reconfiguration /// It transition reconfig state to reject new certificates from user diff --git a/crates/sui-core/src/consensus_validator.rs b/crates/sui-core/src/consensus_validator.rs index 10902c557a598..d37d38e16e112 100644 --- a/crates/sui-core/src/consensus_validator.rs +++ b/crates/sui-core/src/consensus_validator.rs @@ -8,14 +8,17 @@ use fastcrypto_tbls::dkg; use mysten_metrics::monitored_scope; use prometheus::{register_int_counter_with_registry, IntCounter, Registry}; use sui_types::{ - error::SuiError, + error::{SuiError, SuiResult}, messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}, + transaction::Transaction, }; use tap::TapFallible; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::{ - authority::AuthorityState, checkpoints::CheckpointServiceNotify, + authority::{authority_per_epoch_store::AuthorityPerEpochStore, AuthorityState}, + checkpoints::CheckpointServiceNotify, + consensus_adapter::ConsensusOverloadChecker, transaction_manager::TransactionManager, }; @@ -23,6 +26,7 @@ use crate::{ #[derive(Clone)] pub struct SuiTxValidator { authority_state: Arc, + consensus_overload_checker: Arc, checkpoint_service: Arc, _transaction_manager: Arc, metrics: Arc, @@ -31,6 +35,7 @@ pub struct SuiTxValidator { impl SuiTxValidator { pub fn new( authority_state: Arc, + consensus_overload_checker: Arc, checkpoint_service: Arc, transaction_manager: Arc, metrics: Arc, @@ -42,6 +47,7 @@ impl SuiTxValidator { ); Self { authority_state, + consensus_overload_checker, checkpoint_service, _transaction_manager: transaction_manager, metrics, @@ -131,32 +137,38 @@ impl SuiTxValidator { continue; }; - // Currently validity_check() and verify_transaction() are not required to be consistent across validators, - // so they do not run in validate_transactions(). They can run there once we confirm it is safe. - if tx - .validity_check(epoch_store.protocol_config(), epoch_store.epoch()) - .is_err() - { - result.push(i as TransactionIndex); - continue; - } - let Ok(tx) = epoch_store.verify_transaction(*tx.clone()) else { - result.push(i as TransactionIndex); - continue; - }; - - if self - .authority_state - .handle_transaction_v2(&epoch_store, tx) - .await - .is_err() - { + if let Err(e) = self.vote_transaction(&epoch_store, tx).await { + debug!("Failed to vote transaction: {:?}", e); result.push(i as TransactionIndex); } } result } + + async fn vote_transaction( + &self, + epoch_store: &Arc, + tx: Box, + ) -> SuiResult<()> { + // Currently validity_check() and verify_transaction() are not required to be consistent across validators, + // so they do not run in validate_transactions(). They can run there once we confirm it is safe. + tx.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?; + + self.authority_state.check_system_overload( + &*self.consensus_overload_checker, + tx.data(), + self.authority_state.check_system_overload_at_signing(), + )?; + + let tx = epoch_store.verify_transaction(*tx)?; + + self.authority_state + .handle_transaction_v2(epoch_store, tx) + .await?; + + Ok(()) + } } fn tx_kind_from_bytes(tx: &[u8]) -> Result { @@ -240,7 +252,10 @@ mod tests { use crate::{ authority::test_authority_builder::TestAuthorityBuilder, checkpoints::CheckpointServiceNoop, - consensus_adapter::consensus_tests::{test_certificates, test_gas_objects}, + consensus_adapter::{ + consensus_tests::{test_certificates, test_gas_objects}, + NoopConsensusOverloadChecker, + }, consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics}, }; @@ -273,6 +288,7 @@ mod tests { let metrics = SuiTxValidatorMetrics::new(&Default::default()); let validator = SuiTxValidator::new( state.clone(), + Arc::new(NoopConsensusOverloadChecker {}), Arc::new(CheckpointServiceNoop {}), state.transaction_manager().clone(), metrics, diff --git a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs index cfe9a3dd70a32..2f17efad34ccd 100644 --- a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs @@ -15,6 +15,7 @@ use tokio::{sync::mpsc, time::sleep}; use crate::{ authority::{test_authority_builder::TestAuthorityBuilder, AuthorityState}, checkpoints::{CheckpointMetrics, CheckpointService, CheckpointServiceNoop}, + consensus_adapter::NoopConsensusOverloadChecker, consensus_handler::ConsensusHandlerInitializer, consensus_manager::{ mysticeti_manager::MysticetiManager, ConsensusManagerMetrics, ConsensusManagerTrait, @@ -97,6 +98,7 @@ async fn test_mysticeti_manager() { consensus_handler_initializer, SuiTxValidator::new( state.clone(), + Arc::new(NoopConsensusOverloadChecker {}), Arc::new(CheckpointServiceNoop {}), state.transaction_manager().clone(), SuiTxValidatorMetrics::new(&Registry::new()), diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 09f46d40d16be..1325a0132cbcc 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -1349,6 +1349,7 @@ impl SuiNode { consensus_handler_initializer, SuiTxValidator::new( state.clone(), + consensus_adapter.clone(), checkpoint_service.clone(), state.transaction_manager().clone(), sui_tx_validator_metrics.clone(),