Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Nov 5, 2024
1 parent 96a00d4 commit a860847
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 67 deletions.
50 changes: 36 additions & 14 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::consensus_adapter::ConsensusOverloadChecker;
use crate::execution_cache::ExecutionCacheTraitPointers;
use crate::execution_cache::TransactionCacheRead;
use crate::jsonrpc_index::CoinIndexKey2;
Expand Down Expand Up @@ -142,7 +143,6 @@ use crate::authority::authority_store_pruner::{
use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::checkpoints::CheckpointStore;
use crate::consensus_adapter::ConsensusAdapter;
use crate::epoch::committee_store::CommitteeStore;
use crate::execution_cache::{
CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
Expand Down Expand Up @@ -1017,19 +1017,21 @@ impl AuthorityState {
}
}

/// When Ok, returns None if the transaction has not been executed, and returns
/// (TransactionEffects, TransactionEvents) if the transaction has been executed.
#[instrument(level = "trace", skip_all)]
pub async fn handle_transaction_v2(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
transaction: VerifiedTransaction,
) -> SuiResult<Option<(SenderSignedData, TransactionStatus)>> {
) -> SuiResult<Option<(TransactionEffects, TransactionEvents)>> {
let tx_digest = *transaction.digest();
debug!("handle_transaction_v2");

// Ensure an idempotent answer.
let tx_status = self.get_transaction_status(&tx_digest, epoch_store)?;
if tx_status.is_some() {
return Ok(tx_status);
// Check if the transaction has already been executed.
let tx_output = self.get_transaction_output(&tx_digest)?;
if tx_output.is_some() {
return Ok(tx_output);
}

let _metrics_guard = self
Expand Down Expand Up @@ -1059,10 +1061,7 @@ impl AuthorityState {
// It happens frequently that while we are checking the validity of the transaction, it
// has just been executed.
// In that case, we could still return Ok to avoid showing confusing errors.
Err(e) => self
.get_transaction_status(&tx_digest, epoch_store)?
.ok_or(e)
.map(Some),
Err(e) => self.get_transaction_output(&tx_digest)?.ok_or(e).map(Some),
}
}

Expand All @@ -1080,7 +1079,7 @@ impl AuthorityState {

pub(crate) fn check_system_overload(
&self,
consensus_adapter: &Arc<ConsensusAdapter>,
consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
tx_data: &SenderSignedData,
do_authority_overload_check: bool,
) -> SuiResult {
Expand All @@ -1094,9 +1093,11 @@ impl AuthorityState {
.tap_err(|_| {
self.update_overload_metrics("execution_pending");
})?;
consensus_adapter.check_consensus_overload().tap_err(|_| {
self.update_overload_metrics("consensus");
})?;
consensus_overload_checker
.check_consensus_overload()
.tap_err(|_| {
self.update_overload_metrics("consensus");
})?;
Ok(())
}

Expand Down Expand Up @@ -4166,6 +4167,27 @@ impl AuthorityState {
.await;
}

/// Gets the execution outputs of a transaction if they exist
#[instrument(level = "trace", skip_all)]
pub fn get_transaction_output(
&self,
transaction_digest: &TransactionDigest,
) -> SuiResult<Option<(TransactionEffects, TransactionEvents)>> {
let effects = self
.get_transaction_cache_reader()
.get_executed_effects(transaction_digest)?;
if let Some(effects) = effects {
let events = if let Some(digest) = effects.events_digest() {
self.get_transaction_events(digest)?
} else {
TransactionEvents::default()
};
Ok(Some((effects, events)))
} else {
Ok(None)
}
}

/// Make a status response for a transaction
#[instrument(level = "trace", skip_all)]
pub fn get_transaction_status(
Expand Down
32 changes: 12 additions & 20 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use sui_network::{
api::{Validator, ValidatorServer},
tonic,
};
use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
use sui_types::messages_grpc::{
HandleCertificateRequestV3, HandleCertificateResponseV3, HandleTransactionResponseV2,
};
Expand All @@ -42,10 +43,6 @@ use sui_types::{
CheckpointRequest, CheckpointRequestV2, CheckpointResponse, CheckpointResponseV2,
},
};
use sui_types::{
messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
messages_grpc::TransactionStatus,
};
use tap::TapFallible;
use tokio::task::JoinHandle;
use tonic::metadata::{Ascii, MetadataValue};
Expand Down Expand Up @@ -405,6 +402,8 @@ impl ValidatorService {
self.transaction(request).await
}

// When making changes to this function, see if the changes should be applied to
// `handle_transaction_v2()` and `SuiTxValidator::vote_transaction()` as well.
async fn handle_transaction(
&self,
request: tonic::Request<Transaction>,
Expand All @@ -430,7 +429,7 @@ impl ValidatorService {
// higher chance to succeed.
let mut validator_pushback_error = None;
let overload_check_res = state.check_system_overload(
&consensus_adapter,
&*consensus_adapter,
transaction.data(),
state.check_system_overload_at_signing(),
);
Expand Down Expand Up @@ -511,7 +510,7 @@ impl ValidatorService {

// Check system overload
let overload_check_res = self.state.check_system_overload(
&consensus_adapter,
&*consensus_adapter,
transaction.data(),
state.check_system_overload_at_signing(),
);
Expand All @@ -535,7 +534,7 @@ impl ValidatorService {
let tx_digest = transaction.digest();
let span = error_span!("validator_state_process_tx_v2", ?tx_digest);

let tx_status = state
let tx_output = state
.handle_transaction_v2(&epoch_store, transaction.clone())
.instrument(span)
.await
Expand All @@ -544,25 +543,18 @@ impl ValidatorService {
metrics.num_rejected_tx_in_epoch_boundary.inc();
}
})?;
if let Some((
_sender_signed_data,
// TODO(fastpath): Suppress duplicate transaction submission in consensus
// adapter, if not already done. (If we get back `TransactionStatus::Signed``
// here we still need to proceed with submission logic, because the previous
// RPC might have been dropped after signing but before submission.)
TransactionStatus::Executed(_sign_info, signed_effects, events),
)) = tx_status
{
// Fetch remaining fields if the transaction has been executed.
if let Some((effects, events)) = tx_output {
let input_objects = include_input_objects
.then(|| state.get_transaction_input_objects(signed_effects.data()))
.then(|| state.get_transaction_input_objects(&effects))
.and_then(Result::ok);
let output_objects = include_output_objects
.then(|| state.get_transaction_output_objects(signed_effects.data()))
.then(|| state.get_transaction_output_objects(&effects))
.and_then(Result::ok);

return Ok((
tonic::Response::new(HandleTransactionResponseV2 {
effects: signed_effects.into_data(),
effects,
events: include_events.then_some(events),
input_objects,
output_objects,
Expand Down Expand Up @@ -683,7 +675,7 @@ impl ValidatorService {
// Check system overload
for certificate in &certificates {
let overload_check_res = self.state.check_system_overload(
&self.consensus_adapter,
&*self.consensus_adapter,
certificate.data(),
self.state.check_system_overload_at_execution(),
);
Expand Down
34 changes: 25 additions & 9 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ impl ConsensusAdapterMetrics {
}
}

/// An object that can be used to check if the consensus is overloaded.
pub trait ConsensusOverloadChecker: Sync + Send + 'static {
fn check_consensus_overload(&self) -> SuiResult;
}

#[mockall::automock]
#[async_trait::async_trait]
pub trait SubmitToConsensus: Sync + Send + 'static {
Expand All @@ -193,6 +198,7 @@ pub trait SubmitToConsensus: Sync + Send + 'static {
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult;
}

/// Submit Sui certificates to the consensus.
pub struct ConsensusAdapter {
/// The network client connecting to the consensus node of this authority.
Expand Down Expand Up @@ -576,7 +582,7 @@ impl ConsensusAdapter {

/// Performs weakly consistent checks on internal buffers to quickly
/// discard transactions if we are overloaded
pub fn check_limits(&self) -> bool {
fn check_limits(&self) -> bool {
// First check total transactions (waiting and in submission)
if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
> self.max_pending_transactions
Expand All @@ -587,14 +593,6 @@ impl ConsensusAdapter {
self.submit_semaphore.available_permits() > 0
}

pub(crate) fn check_consensus_overload(&self) -> SuiResult {
fp_ensure!(
self.check_limits(),
SuiError::TooManyTransactionsPendingConsensus
);
Ok(())
}

fn submit_unchecked(
self: &Arc<Self>,
transactions: &[ConsensusTransaction],
Expand Down Expand Up @@ -978,6 +976,24 @@ pub fn get_position_in_list(
.0
}

impl ConsensusOverloadChecker for ConsensusAdapter {
fn check_consensus_overload(&self) -> SuiResult {
fp_ensure!(
self.check_limits(),
SuiError::TooManyTransactionsPendingConsensus
);
Ok(())
}
}

pub struct NoopConsensusOverloadChecker {}

impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
fn check_consensus_overload(&self) -> SuiResult {
Ok(())
}
}

impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
/// This method is called externally to begin reconfiguration
/// It transition reconfig state to reject new certificates from user
Expand Down
64 changes: 40 additions & 24 deletions crates/sui-core/src/consensus_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@ use fastcrypto_tbls::dkg;
use mysten_metrics::monitored_scope;
use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use sui_types::{
error::SuiError,
error::{SuiError, SuiResult},
messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
transaction::Transaction,
};
use tap::TapFallible;
use tracing::{info, warn};
use tracing::{debug, info, warn};

use crate::{
authority::AuthorityState, checkpoints::CheckpointServiceNotify,
authority::{authority_per_epoch_store::AuthorityPerEpochStore, AuthorityState},
checkpoints::CheckpointServiceNotify,
consensus_adapter::ConsensusOverloadChecker,
transaction_manager::TransactionManager,
};

/// Allows verifying the validity of transactions
#[derive(Clone)]
pub struct SuiTxValidator {
authority_state: Arc<AuthorityState>,
consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
_transaction_manager: Arc<TransactionManager>,
metrics: Arc<SuiTxValidatorMetrics>,
Expand All @@ -31,6 +35,7 @@ pub struct SuiTxValidator {
impl SuiTxValidator {
pub fn new(
authority_state: Arc<AuthorityState>,
consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
transaction_manager: Arc<TransactionManager>,
metrics: Arc<SuiTxValidatorMetrics>,
Expand All @@ -42,6 +47,7 @@ impl SuiTxValidator {
);
Self {
authority_state,
consensus_overload_checker,
checkpoint_service,
_transaction_manager: transaction_manager,
metrics,
Expand Down Expand Up @@ -131,32 +137,38 @@ impl SuiTxValidator {
continue;
};

// Currently validity_check() and verify_transaction() are not required to be consistent across validators,
// so they do not run in validate_transactions(). They can run there once we confirm it is safe.
if tx
.validity_check(epoch_store.protocol_config(), epoch_store.epoch())
.is_err()
{
result.push(i as TransactionIndex);
continue;
}
let Ok(tx) = epoch_store.verify_transaction(*tx.clone()) else {
result.push(i as TransactionIndex);
continue;
};

if self
.authority_state
.handle_transaction_v2(&epoch_store, tx)
.await
.is_err()
{
if let Err(e) = self.vote_transaction(&epoch_store, tx).await {
debug!("Failed to vote transaction: {:?}", e);
result.push(i as TransactionIndex);
}
}

result
}

async fn vote_transaction(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
tx: Box<Transaction>,
) -> SuiResult<()> {
// Currently validity_check() and verify_transaction() are not required to be consistent across validators,
// so they do not run in validate_transactions(). They can run there once we confirm it is safe.
tx.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;

self.authority_state.check_system_overload(
&*self.consensus_overload_checker,
tx.data(),
self.authority_state.check_system_overload_at_signing(),
)?;

let tx = epoch_store.verify_transaction(*tx)?;

self.authority_state
.handle_transaction_v2(epoch_store, tx)
.await?;

Ok(())
}
}

fn tx_kind_from_bytes(tx: &[u8]) -> Result<ConsensusTransactionKind, ValidationError> {
Expand Down Expand Up @@ -240,7 +252,10 @@ mod tests {
use crate::{
authority::test_authority_builder::TestAuthorityBuilder,
checkpoints::CheckpointServiceNoop,
consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
consensus_adapter::{
consensus_tests::{test_certificates, test_gas_objects},
NoopConsensusOverloadChecker,
},
consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
};

Expand Down Expand Up @@ -273,6 +288,7 @@ mod tests {
let metrics = SuiTxValidatorMetrics::new(&Default::default());
let validator = SuiTxValidator::new(
state.clone(),
Arc::new(NoopConsensusOverloadChecker {}),
Arc::new(CheckpointServiceNoop {}),
state.transaction_manager().clone(),
metrics,
Expand Down
Loading

0 comments on commit a860847

Please sign in to comment.