From 4754648682f91a43b6dec3bf2106003bd20975e8 Mon Sep 17 00:00:00 2001 From: Lu Zhang <8418040+longbowlu@users.noreply.github.com> Date: Tue, 25 Jul 2023 15:25:19 -0700 Subject: [PATCH] Indexer pipeline (#13075) ## Description Describe the changes or additions included in this PR. ## Test Plan How did you test the new or updated feature? --- If your changes are not user-facing and not a breaking change, you can skip the following section. Otherwise, please indicate what changed, and then add to the Release Notes section as highlighted during the release process. ### Type of Change (Check all that apply) - [ ] protocol change - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes --- Cargo.lock | 1 + crates/sui-indexer/Cargo.toml | 1 + .../sui-indexer/benches/indexer_benchmark.rs | 38 +- crates/sui-indexer/src/errors.rs | 16 + .../src/handlers/checkpoint_handler.rs | 973 ++++++++++-------- crates/sui-indexer/src/metrics.rs | 17 +- crates/sui-indexer/src/store/indexer_store.rs | 10 +- .../sui-indexer/src/store/pg_indexer_store.rs | 51 +- 8 files changed, 586 insertions(+), 521 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81f3a99752ef7..9ad5ab3680984 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9750,6 +9750,7 @@ dependencies = [ "sui-sdk", "sui-test-transaction-builder", "sui-types", + "tap", "telemetry-subscribers", "test-cluster", "thiserror", diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index 4375b2da32703..d7a6d88e9fd4b 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -15,6 +15,7 @@ backoff.workspace = true bcs.workspace = true chrono.workspace = true clap.workspace = true +tap.workspace = true diesel.workspace = true diesel-derive-enum.workspace = true futures.workspace = true diff --git a/crates/sui-indexer/benches/indexer_benchmark.rs b/crates/sui-indexer/benches/indexer_benchmark.rs index 9bb4af8842e6c..dff31b03bbb95 100644 --- a/crates/sui-indexer/benches/indexer_benchmark.rs +++ b/crates/sui-indexer/benches/indexer_benchmark.rs @@ -14,21 +14,15 @@ use tokio::runtime::Runtime; use sui_indexer::metrics::IndexerMetrics; use sui_indexer::models::checkpoints::Checkpoint; -use sui_indexer::models::objects::{NamedBcsBytes, Object as DBObject, ObjectStatus}; -use sui_indexer::models::owners::OwnerType; use sui_indexer::models::transactions::Transaction; use sui_indexer::new_pg_connection_pool; -use sui_indexer::store::{ - IndexerStore, PgIndexerStore, TemporaryCheckpointStore, TransactionObjectChanges, -}; +use sui_indexer::store::{IndexerStore, PgIndexerStore, TemporaryCheckpointStore}; use sui_indexer::utils::reset_database; use sui_json_rpc_types::CheckpointId; use sui_types::base_types::{ObjectDigest, ObjectID, SequenceNumber, SuiAddress}; use sui_types::crypto::AggregateAuthoritySignature; use sui_types::digests::TransactionDigest; -use sui_types::gas_coin::GasCoin; use sui_types::messages_checkpoint::CheckpointDigest; -use sui_types::object::Object; use sui_types::transaction::{TransactionData, TEST_ONLY_GAS_UNIT_FOR_TRANSFER}; fn indexer_benchmark(c: &mut Criterion) { @@ -83,10 +77,6 @@ fn create_checkpoint(sequence_number: i64) -> TemporaryCheckpointStore { .map(|_| create_transaction(sequence_number)) .collect(), events: vec![], - object_changes: vec![TransactionObjectChanges { - changed_objects: (1..1000).map(|_| create_object(sequence_number)).collect(), - deleted_objects: vec![], - }], packages: vec![], input_objects: vec![], changed_objects: vec![], @@ -137,32 +127,6 @@ fn create_transaction(sequence_number: i64) -> Transaction { } } -fn create_object(sequence_number: i64) -> DBObject { - DBObject { - epoch: 0, - checkpoint: sequence_number, - object_id: ObjectID::random().to_string(), - version: 0, - object_digest: ObjectDigest::random().to_string(), - owner_type: OwnerType::AddressOwner, - owner_address: Some(SuiAddress::random_for_testing_only().to_string()), - initial_shared_version: None, - previous_transaction: TransactionDigest::random().base58_encode(), - object_type: GasCoin::type_().to_string(), - object_status: ObjectStatus::Created, - has_public_transfer: true, - storage_rebate: 0, - bcs: vec![NamedBcsBytes( - "data".to_string(), - Object::new_gas_for_testing() - .data - .try_as_move() - .unwrap() - .contents() - .to_vec(), - )], - } -} criterion_group! { name = benches; config = Criterion::default().sample_size(50).measurement_time(Duration::from_secs(10)); diff --git a/crates/sui-indexer/src/errors.rs b/crates/sui-indexer/src/errors.rs index c2f33a0a174b4..e7bc69e0b7d26 100644 --- a/crates/sui-indexer/src/errors.rs +++ b/crates/sui-indexer/src/errors.rs @@ -9,6 +9,22 @@ use thiserror::Error; use sui_types::base_types::ObjectIDParseError; use sui_types::error::{SuiError, SuiObjectResponseError, UserInputError}; +#[derive(Debug, Error)] +pub struct DataDownloadError { + pub error: IndexerError, + pub next_checkpoint_sequence_number: u64, +} + +impl std::fmt::Display for DataDownloadError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "next_checkpoint_seq: {}, error: {}", + self.next_checkpoint_sequence_number, self.error + ) + } +} + #[derive(Debug, Error)] pub enum IndexerError { #[error("Indexer failed to convert timestamp to NaiveDateTime with error: `{0}`")] diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 4a5fa79e59cb4..7fceb38a36638 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -1,14 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 - -use std::collections::BTreeMap; -use std::sync::Arc; - use fastcrypto::traits::ToFromBytes; use futures::future::join_all; +use futures::stream::FuturesOrdered; use futures::FutureExt; +use futures::StreamExt; use jsonrpsee::http_client::HttpClient; use move_core_types::ident_str; +use std::collections::BTreeMap; +use std::env; +use std::sync::Arc; +use tap::tap::TapFallible; use tokio::sync::{ mpsc::{self, Receiver, Sender}, Mutex, @@ -31,6 +33,7 @@ use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait}; use sui_types::SUI_SYSTEM_ADDRESS; +use crate::errors::DataDownloadError; use crate::errors::IndexerError; use crate::metrics::IndexerMetrics; use crate::models::checkpoints::Checkpoint; @@ -48,24 +51,27 @@ use crate::IndexerConfig; const MAX_PARALLEL_DOWNLOADS: usize = 24; const DOWNLOAD_RETRY_INTERVAL_IN_SECS: u64 = 10; +const CHECKPOINT_INDEX_RETRY_INTERVAL_IN_SECS: u64 = 10; const DB_COMMIT_RETRY_INTERVAL_IN_MILLIS: u64 = 100; const MULTI_GET_CHUNK_SIZE: usize = 50; const CHECKPOINT_QUEUE_LIMIT: usize = 24; +const DOWNLOAD_QUEUE_SIZE: usize = 10000; const EPOCH_QUEUE_LIMIT: usize = 2; +#[allow(clippy::type_complexity)] #[derive(Clone)] pub struct CheckpointHandler { state: S, http_client: HttpClient, - // MUSTFIX(gegaowp): remove subscription_handler from checkpoint_handler; - // b/c subscription_handler should be on indexer reader, while checkpoint_handler is on indexer writer. - // subscription_handler: Arc, metrics: IndexerMetrics, config: IndexerConfig, + // FIXME: these do not need to be Arc> tx_checkpoint_sender: Arc>>, tx_checkpoint_receiver: Arc>>, - object_checkpoint_sender: Arc>>, - object_checkpoint_receiver: Arc>>, + object_changes_sender: + Arc)>>>, + object_changes_receiver: + Arc)>>>, epoch_sender: Arc>>, epoch_receiver: Arc>>, } @@ -82,19 +88,18 @@ where config: &IndexerConfig, ) -> Self { let (tx_checkpoint_sender, tx_checkpoint_receiver) = mpsc::channel(CHECKPOINT_QUEUE_LIMIT); - let (object_checkpoint_sender, object_checkpoint_receiver) = + let (object_changes_sender, object_changes_receiver) = mpsc::channel(CHECKPOINT_QUEUE_LIMIT); let (epoch_sender, epoch_receiver) = mpsc::channel(EPOCH_QUEUE_LIMIT); Self { state, http_client, - // subscription_handler, metrics, config: config.clone(), tx_checkpoint_sender: Arc::new(Mutex::new(tx_checkpoint_sender)), tx_checkpoint_receiver: Arc::new(Mutex::new(tx_checkpoint_receiver)), - object_checkpoint_sender: Arc::new(Mutex::new(object_checkpoint_sender)), - object_checkpoint_receiver: Arc::new(Mutex::new(object_checkpoint_receiver)), + object_changes_sender: Arc::new(Mutex::new(object_changes_sender)), + object_changes_receiver: Arc::new(Mutex::new(object_changes_receiver)), epoch_sender: Arc::new(Mutex::new(epoch_sender)), epoch_receiver: Arc::new(Mutex::new(epoch_receiver)), } @@ -102,46 +107,47 @@ where pub fn spawn(self) -> JoinHandle<()> { info!("Indexer checkpoint handler started..."); - let tx_download_handler = self.clone(); + let (downloaded_checkpoint_data_sender, downloaded_checkpoint_data_receiver) = + tokio::sync::mpsc::channel(DOWNLOAD_QUEUE_SIZE); + let self_clone = self.clone(); + // Start Checkpoint/Tx Downloader spawn_monitored_task!(async move { - let mut checkpoint_download_index_res = tx_download_handler - .start_download_and_index_tx_checkpoint() - .await; - while let Err(e) = &checkpoint_download_index_res { - warn!( - "Indexer checkpoint download & index failed with error: {:?}, retrying after {:?} secs...", - e, DOWNLOAD_RETRY_INTERVAL_IN_SECS - ); - tokio::time::sleep(std::time::Duration::from_secs( - DOWNLOAD_RETRY_INTERVAL_IN_SECS, - )) - .await; - checkpoint_download_index_res = tx_download_handler - .start_download_and_index_tx_checkpoint() - .await; - } + // -1 will be returned when checkpoints table is empty. + let last_seq_from_db = self_clone + .state + .get_latest_tx_checkpoint_sequence_number() + .await + .expect("Failed to get latest tx checkpoint sequence number from DB"); + Self::run_checkpoint_downloader( + self_clone, + last_seq_from_db + 1, + downloaded_checkpoint_data_sender, + ) + .await; }); - - let object_download_handler = self.clone(); + // Start Checkpoint/Tx Indexing Processor + let mut checkpoint_processor = CheckpointProcessor { + state: self.state.clone(), + metrics: self.metrics.clone(), + epoch_sender: self.epoch_sender.clone(), + checkpoint_sender: self.tx_checkpoint_sender.clone(), + downloaded_checkpoint_data_receiver, + }; spawn_monitored_task!(async move { - let mut object_download_index_res = object_download_handler - .start_download_and_index_object_checkpoint() - .await; - while let Err(e) = &object_download_index_res { + let mut res = checkpoint_processor.run().await; + while let Err(e) = &res { warn!( - "Indexer object download & index failed with error: {:?}, retrying after {:?} secs...", - e, DOWNLOAD_RETRY_INTERVAL_IN_SECS + "Indexer checkpoint data processing failed with error: {:?}, retrying after {:?} secs...", + e, CHECKPOINT_INDEX_RETRY_INTERVAL_IN_SECS ); tokio::time::sleep(std::time::Duration::from_secs( - DOWNLOAD_RETRY_INTERVAL_IN_SECS, + CHECKPOINT_INDEX_RETRY_INTERVAL_IN_SECS, )) .await; - object_download_index_res = object_download_handler - .start_download_and_index_object_checkpoint() - .await; + res = checkpoint_processor.run().await; } }); - + // Start Checkpoint/Tx Commit Handler let tx_checkpoint_commit_handler = self.clone(); spawn_monitored_task!(async move { let mut checkpoint_commit_res = tx_checkpoint_commit_handler @@ -162,12 +168,53 @@ where } }); + // Start Checkpoint Objects Downloader + let (downloaded_object_data_sender, downloaded_object_data_receiver) = + tokio::sync::mpsc::channel(DOWNLOAD_QUEUE_SIZE); + let self_clone = self.clone(); + spawn_monitored_task!(async move { + // -1 will be returned when checkpoints table is empty. + let last_seq_from_db = self_clone + .state + .get_latest_object_checkpoint_sequence_number() + .await + .expect("Failed to get latest object checkpoint sequence number from DB"); + Self::run_checkpoint_downloader( + self_clone, + last_seq_from_db + 1, + downloaded_object_data_sender, + ) + .await; + }); + + // Start Checkpoint Objects Indexing Processor + let mut checkpoint_objects_processor = CheckpointObjectsProcessor { + metrics: self.metrics.clone(), + object_changes_sender: self.object_changes_sender.clone(), + downloaded_object_data_receiver, + }; + spawn_monitored_task!(async move { + let mut res = checkpoint_objects_processor.run().await; + while let Err(e) = &res { + warn!( + "Indexer checkpoint object data processing failed with error: {:?}, retrying after {:?} secs...", + e, CHECKPOINT_INDEX_RETRY_INTERVAL_IN_SECS + ); + tokio::time::sleep(std::time::Duration::from_secs( + CHECKPOINT_INDEX_RETRY_INTERVAL_IN_SECS, + )) + .await; + res = checkpoint_objects_processor.run().await; + } + }); + + // Start Checkpoint Objects Commit Handler let object_checkpoint_commit_handler = self.clone(); spawn_monitored_task!(async move { - let mut object_checkpoint_commit_res = object_checkpoint_commit_handler + let mut checkpoint_commit_res = object_checkpoint_commit_handler .start_object_checkpoint_commit() .await; - while let Err(e) = &object_checkpoint_commit_res { + while let Err(e) = &checkpoint_commit_res { warn!( "Indexer object checkpoint commit failed with error: {:?}, retrying after {:?} secs...", e, DOWNLOAD_RETRY_INTERVAL_IN_SECS @@ -176,12 +223,13 @@ where DOWNLOAD_RETRY_INTERVAL_IN_SECS, )) .await; - object_checkpoint_commit_res = object_checkpoint_commit_handler + checkpoint_commit_res = object_checkpoint_commit_handler .start_object_checkpoint_commit() .await; } }); + // Start Epoch Commit Handler spawn_monitored_task!(async move { let mut epoch_commit_res = self.start_epoch_commit().await; while let Err(e) = &epoch_commit_res { @@ -198,253 +246,129 @@ where }) } - async fn start_download_and_index_tx_checkpoint(&self) -> Result<(), IndexerError> { - info!("Indexer checkpoint download & index task started..."); - // NOTE: important not to cast i64 to u64 here, - // because -1 will be returned when checkpoints table is empty. - let last_seq_from_db = self - .state - .get_latest_tx_checkpoint_sequence_number() - .await?; - if last_seq_from_db > 0 { - info!("Resuming tx handler from checkpoint {last_seq_from_db}"); + pub async fn run_checkpoint_downloader( + checkpoint_download_handler: Self, + start_seq: i64, + tx: Sender, + ) { + let mut checkpoint_download_index_res = checkpoint_download_handler + .start_download_checkpoint_data(start_seq, tx.clone()) + .await; + while let Err(e) = &checkpoint_download_index_res { + error!( + "Indexer checkpoint download & index failed with error: {:?}, retrying after {:?} secs...", + e, DOWNLOAD_RETRY_INTERVAL_IN_SECS + ); + tokio::time::sleep(std::time::Duration::from_secs( + DOWNLOAD_RETRY_INTERVAL_IN_SECS, + )) + .await; + checkpoint_download_index_res = checkpoint_download_handler + .start_download_checkpoint_data( + e.next_checkpoint_sequence_number as i64, + tx.clone(), + ) + .await; } - let mut next_cursor_sequence_number = last_seq_from_db + 1; + } + + async fn start_download_checkpoint_data( + &self, + // FIXME: this does not need to be i64, + starting_checkpoint_seq: i64, + tx: Sender, + ) -> Result<(), DataDownloadError> { + info!("Indexer checkpoint download & index task resumed from {starting_checkpoint_seq}..."); + let mut next_cursor_sequence_number = starting_checkpoint_seq; // NOTE: we will download checkpoints in parallel, but we will commit them sequentially. // We will start with MAX_PARALLEL_DOWNLOADS, and adjust if no more checkpoints are available. - let mut current_parallel_downloads = MAX_PARALLEL_DOWNLOADS; + let current_parallel_downloads = MAX_PARALLEL_DOWNLOADS; loop { - // Step 1: download tx checkpoint data for checkpoints in the current batch - let download_futures = (next_cursor_sequence_number - ..next_cursor_sequence_number + current_parallel_downloads as i64) - .map(|seq_num| self.download_checkpoint_data(seq_num as u64)); - let download_results = join_all(download_futures).await; - let mut downloaded_checkpoints = vec![]; + let mut download_futures = FuturesOrdered::new(); + for seq_num in next_cursor_sequence_number + ..next_cursor_sequence_number + current_parallel_downloads as i64 + { + download_futures.push_back(self.download_checkpoint_data(seq_num as u64)); + } // NOTE: Push sequentially and if one of the downloads failed, // we will discard all following checkpoints and retry, to avoid messing up the DB commit order. - for download_result in download_results { - if let Ok(checkpoint) = download_result { - downloaded_checkpoints.push(checkpoint); - } else { - if let Err(IndexerError::UnexpectedFullnodeResponseError(fn_e)) = - download_result - { - warn!( - "Unexpected response from fullnode for checkpoints: {}", - fn_e - ); - } else if let Err(IndexerError::FullNodeReadingError(fn_e)) = download_result { - warn!("Fullnode reading error for checkpoints {}: {}. It can be transient or due to rate limiting.", next_cursor_sequence_number, fn_e); - } else { - warn!("Error downloading checkpoints: {:?}", download_result); + while let Some(res) = download_futures.next().await { + match res { + Ok(checkpoint) => { + tx.send(checkpoint) + .await + .expect("Send to checkpoint channel should not fail"); + next_cursor_sequence_number += 1; } - break; - } - } - next_cursor_sequence_number += downloaded_checkpoints.len() as i64; - // NOTE: with this line, we can make sure that: - // - when indexer is way behind and catching up, we download MAX_PARALLEL_DOWNLOADS checkpoints in parallel; - // - when indexer is up to date, we download at least one checkpoint at a time. - current_parallel_downloads = - std::cmp::min(downloaded_checkpoints.len() + 1, MAX_PARALLEL_DOWNLOADS); - if downloaded_checkpoints.is_empty() { - warn!( - "No checkpoints were downloaded for sequence number {}, retrying...", - next_cursor_sequence_number - ); - continue; - } - - // Step 2: Transform tx checkpoint data to indexed_checkpoints - let index_timer = self.metrics.checkpoint_index_latency.start_timer(); - let indexed_checkpoint_epoch_vec = join_all(downloaded_checkpoints.iter().map( - |downloaded_checkpoint| async { - self.index_checkpoint_and_epoch(downloaded_checkpoint).await - }, - )) - .await - .into_iter() - .collect::, IndexerError>>() - .map_err(|e| { - error!( - "Failed to index checkpoints {:?} with error: {}", - downloaded_checkpoints, - e.to_string() - ); - e - })?; - let (indexed_checkpoints, indexed_epochs): (Vec<_>, Vec<_>) = - indexed_checkpoint_epoch_vec.into_iter().unzip(); - index_timer.stop_and_record(); - - // Step 3: send indexed_checkpoints to channel to be committed later. - let tx_checkpoint_sender_guard = self.tx_checkpoint_sender.lock().await; - // NOTE: when the channel is full, checkpoint_sender_guard will wait until the channel has space. - // Checkpoints are sent sequentially to stick to the order of checkpoint sequence numbers. - for indexed_checkpoint in indexed_checkpoints { - tx_checkpoint_sender_guard - .send(indexed_checkpoint) - .await - .map_err(|e| { - error!("Failed to send indexed checkpoint to checkpoint commit handler with error: {}", e.to_string()); - IndexerError::MpscChannelError(e.to_string()) - })?; - } - drop(tx_checkpoint_sender_guard); - - // Step 4: handle indexed_epochs, which depends on downloaded object changes. - for epoch in indexed_epochs.into_iter().flatten() { - // commit first epoch immediately, send other epochs to channel to be committed later. - if epoch.last_epoch.is_none() { - let epoch_db_guard = self.metrics.epoch_db_commit_latency.start_timer(); - info!("Persisting first epoch..."); - let mut persist_first_epoch_res = self.state.persist_epoch(&epoch).await; - while persist_first_epoch_res.is_err() { - warn!("Failed to persist first epoch, retrying..."); - persist_first_epoch_res = self.state.persist_epoch(&epoch).await; + Err(e) => { + return Err(DataDownloadError { + error: e, + next_checkpoint_sequence_number: next_cursor_sequence_number as u64, + }) } - epoch_db_guard.stop_and_record(); - self.metrics.total_epoch_committed.inc(); - info!("Persisted first epoch"); - } else { - let epoch_sender_guard = self.epoch_sender.lock().await; - // NOTE: when the channel is full, epoch_sender_guard will wait until the channel has space. - epoch_sender_guard.send(epoch).await.map_err(|e| { - error!( - "Failed to send indexed epoch to epoch commit handler with error {}", - e.to_string() - ); - IndexerError::MpscChannelError(e.to_string()) - })?; - drop(epoch_sender_guard); } } } } - async fn start_download_and_index_object_checkpoint(&self) -> Result<(), IndexerError> { - info!("Indexer object checkpoint download & index task started..."); - let last_seq_from_db = self - .state - .get_latest_object_checkpoint_sequence_number() - .await?; - if last_seq_from_db > 0 { - info!("Resuming obj handler from checkpoint {last_seq_from_db}"); - } - let mut next_cursor_sequence_number = last_seq_from_db + 1; - // NOTE: we will download checkpoints in parallel, but we will commit them sequentially. - // We will start with MAX_PARALLEL_DOWNLOADS, and adjust if no more checkpoints are available. - let mut current_parallel_downloads = MAX_PARALLEL_DOWNLOADS; + async fn start_tx_checkpoint_commit(&self) -> Result<(), IndexerError> { + info!("Indexer checkpoint commit task started..."); + let checkpoint_commit_batch_size = env::var("CHECKPOINT_COMMIT_BATCH_SIZE") + .unwrap_or(5.to_string()) + .parse::() + .unwrap(); loop { - // Step 1: download tx checkpoint data for checkpoints in the current batch - let download_futures = (next_cursor_sequence_number - ..next_cursor_sequence_number + current_parallel_downloads as i64) - .map(|seq_num| self.download_checkpoint_data(seq_num as u64)); - let download_results = join_all(download_futures).await; - let mut downloaded_checkpoints = vec![]; - // NOTE: Push sequentially and if one of the downloads failed, - // we will discard all following checkpoints and retry, to avoid messing up the DB commit order. - for download_result in download_results { - if let Ok(checkpoint) = download_result { - downloaded_checkpoints.push(checkpoint); - } else { - if let Err(IndexerError::UnexpectedFullnodeResponseError(fn_e)) = - download_result - { - warn!( - "Unexpected response from fullnode for checkpoints: {}", - fn_e - ); - } else if let Err(IndexerError::FullNodeReadingError(fn_e)) = download_result { - warn!("Fullnode reading error for checkpoints {}: {}. It can be transient or due to rate limiting.", next_cursor_sequence_number, fn_e); - } else { - warn!("Error downloading checkpoints: {:?}", download_result); + let mut checkpoint_receiver_guard = self.tx_checkpoint_receiver.lock().await; + + let mut indexed_checkpoint_batch = vec![]; + loop { + if let Ok(ckp) = checkpoint_receiver_guard.try_recv() { + indexed_checkpoint_batch.push(ckp); + if indexed_checkpoint_batch.len() >= checkpoint_commit_batch_size as usize { + break; } + } else if indexed_checkpoint_batch.is_empty() { + if let Some(ckp) = checkpoint_receiver_guard.recv().await { + indexed_checkpoint_batch.push(ckp); + break; + } + } else { break; } } - next_cursor_sequence_number += downloaded_checkpoints.len() as i64; - // NOTE: with this line, we can make sure that: - // - when indexer is way behind and catching up, we download MAX_PARALLEL_DOWNLOADS checkpoints in parallel; - // - when indexer is up to date, we download at least one checkpoint at a time. - current_parallel_downloads = - std::cmp::min(downloaded_checkpoints.len() + 1, MAX_PARALLEL_DOWNLOADS); - if downloaded_checkpoints.is_empty() { - warn!( - "No checkpoints were downloaded for sequence number {}, retrying...", - next_cursor_sequence_number - ); + drop(checkpoint_receiver_guard); + + let mut checkpoint_batch = vec![]; + let mut tx_batch = vec![]; + + if indexed_checkpoint_batch.is_empty() { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; continue; } - // Step 2: Transform tx checkpoint data to indexed_checkpoints and indexed_epochs - let index_timer = self.metrics.checkpoint_index_latency.start_timer(); - let indexed_checkpoint_epoch_vec = join_all(downloaded_checkpoints.iter().map( - |downloaded_checkpoint| async { - self.index_checkpoint_and_epoch(downloaded_checkpoint).await - }, - )) - .await - .into_iter() - .collect::, IndexerError>>() - .map_err(|e| { - error!( - "Failed to index checkpoints {:?} with error: {}", - downloaded_checkpoints, - e.to_string() + if self.config.skip_db_commit { + info!( + "[Checkpoint/Tx] Downloaded and indexed checkpoint {:?} - {:?} successfully, skipping DB commit...", + indexed_checkpoint_batch.first().map(|c| c.checkpoint.sequence_number), + indexed_checkpoint_batch.last().map(|c| c.checkpoint.sequence_number), ); - e - })?; - let (indexed_checkpoints, _indexed_epochs): (Vec<_>, Vec<_>) = - indexed_checkpoint_epoch_vec.into_iter().unzip(); - index_timer.stop_and_record(); - - // Step 3: send indexed_checkpoints to object channel to be committed later. - let object_checkpoint_sender_guard = self.object_checkpoint_sender.lock().await; - // NOTE: when the channel is full, checkpoint_sender_guard will wait until the channel has space. - // Checkpoints are sent sequentially to stick to the order of checkpoint sequence numbers. - for indexed_checkpoint in indexed_checkpoints { - object_checkpoint_sender_guard - .send(indexed_checkpoint) - .await - .map_err(|e| { - error!("Failed to send indexed checkpoint to checkpoint commit handler with error: {}", e.to_string()); - IndexerError::MpscChannelError(e.to_string()) - })?; + continue; } - drop(object_checkpoint_sender_guard); - } - } - - async fn start_tx_checkpoint_commit(&self) -> Result<(), IndexerError> { - info!("Indexer tx checkpoint commit task started..."); - loop { - let mut tx_checkpoint_receiver_guard = self.tx_checkpoint_receiver.lock().await; - let indexed_checkpoint = tx_checkpoint_receiver_guard.recv().await; - drop(tx_checkpoint_receiver_guard); - - if let Some(indexed_checkpoint) = indexed_checkpoint { - if self.config.skip_db_commit { - info!( - "Downloaded and indexed tx checkpoint {} successfully, skipping DB commit...", - indexed_checkpoint.checkpoint.sequence_number, - ); - continue; - } + for indexed_checkpoint in indexed_checkpoint_batch { // Write checkpoint to DB let TemporaryCheckpointStore { checkpoint, transactions, events, - object_changes: _, packages, input_objects, changed_objects, move_calls, recipients, } = indexed_checkpoint; - let checkpoint_seq = checkpoint.sequence_number; + checkpoint_batch.push(checkpoint); + tx_batch.push(transactions); // NOTE: retrials are necessary here, otherwise results can be popped and discarded. let events_handler = self.clone(); @@ -512,132 +436,165 @@ where .await; } }); + } - let checkpoint_tx_db_guard = - self.metrics.checkpoint_db_commit_latency.start_timer(); - let mut checkpoint_tx_commit_res = self + // now commit batched data + let tx_batch = tx_batch.into_iter().flatten().collect::>(); + let checkpoint_tx_db_guard = self.metrics.checkpoint_db_commit_latency.start_timer(); + let mut checkpoint_tx_commit_res = self + .state + .persist_checkpoint_transactions( + &checkpoint_batch, + &tx_batch, + self.metrics.total_transaction_chunk_committed.clone(), + ) + .await; + while let Err(e) = checkpoint_tx_commit_res { + warn!( + "Indexer checkpoint & transaction commit failed with error: {:?}, retrying after {:?} milli-secs...", + e, DB_COMMIT_RETRY_INTERVAL_IN_MILLIS + ); + tokio::time::sleep(std::time::Duration::from_millis( + DB_COMMIT_RETRY_INTERVAL_IN_MILLIS, + )) + .await; + checkpoint_tx_commit_res = self .state .persist_checkpoint_transactions( - &checkpoint, - &transactions, + &checkpoint_batch, + &tx_batch, self.metrics.total_transaction_chunk_committed.clone(), ) .await; - while let Err(e) = checkpoint_tx_commit_res { - warn!( - "Indexer checkpoint & transaction commit failed with error: {:?}, retrying after {:?} milli-secs...", - e, DB_COMMIT_RETRY_INTERVAL_IN_MILLIS - ); - tokio::time::sleep(std::time::Duration::from_millis( - DB_COMMIT_RETRY_INTERVAL_IN_MILLIS, - )) - .await; - checkpoint_tx_commit_res = self - .state - .persist_checkpoint_transactions( - &checkpoint, - &transactions, - self.metrics.total_transaction_chunk_committed.clone(), - ) - .await; - } - checkpoint_tx_db_guard.stop_and_record(); - self.metrics - .latest_tx_checkpoint_sequence_number - .set(checkpoint_seq); - self.metrics.total_tx_checkpoint_committed.inc(); - let tx_count = transactions.len(); - self.metrics - .total_transaction_committed - .inc_by(tx_count as u64); - info!( - "Tx checkpoint {} committed with {} transactions.", - checkpoint_seq, tx_count, - ); - self.metrics - .transaction_per_checkpoint - .observe(tx_count as f64); - } else { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; } + let elapsed = checkpoint_tx_db_guard.stop_and_record(); + // unwrap: batch must not be empty at this point + let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number; + let last_checkpoint_seq = checkpoint_batch.last().as_ref().unwrap().sequence_number; + self.metrics + .latest_tx_checkpoint_sequence_number + .set(last_checkpoint_seq); + + self.metrics.total_tx_checkpoint_committed.inc(); + let tx_count = tx_batch.len(); + self.metrics + .total_transaction_committed + .inc_by(tx_count as u64); + info!( + elapsed, + "Tx Checkpoint {}-{} committed with {} transactions.", + first_checkpoint_seq, + last_checkpoint_seq, + tx_count, + ); + self.metrics + .transaction_per_checkpoint + .observe(tx_count as f64 / (last_checkpoint_seq - first_checkpoint_seq + 1) as f64); } } async fn start_object_checkpoint_commit(&self) -> Result<(), IndexerError> { info!("Indexer object checkpoint commit task started..."); + let checkpoint_commit_batch_size = env::var("CHECKPOINT_COMMIT_BATCH_SIZE") + .unwrap_or(5.to_string()) + .parse::() + .unwrap(); loop { - let mut object_checkpoint_receiver_guard = self.object_checkpoint_receiver.lock().await; - let indexed_checkpoint = object_checkpoint_receiver_guard.recv().await; - drop(object_checkpoint_receiver_guard); - - if let Some(indexed_checkpoint) = indexed_checkpoint { - if self.config.skip_db_commit { - info!( - "Downloaded and indexed object checkpoint {} successfully, skipping DB commit...", - indexed_checkpoint.checkpoint.sequence_number, - ); - continue; + let mut checkpoint_receiver_guard = self.object_changes_receiver.lock().await; + + let mut object_changes_batch = vec![]; + let mut seqs = vec![]; + loop { + if let Ok((seq, object_changes)) = checkpoint_receiver_guard.try_recv() { + object_changes_batch.push(object_changes); + seqs.push(seq); + if object_changes_batch.len() >= checkpoint_commit_batch_size as usize { + break; + } + } else if object_changes_batch.is_empty() { + if let Some((seq, object_changes)) = checkpoint_receiver_guard.recv().await { + object_changes_batch.push(object_changes); + seqs.push(seq); + break; + } + } else { + break; } - let TemporaryCheckpointStore { - checkpoint, - transactions: _, - events: _, - object_changes, - packages: _, - input_objects: _, - changed_objects: _, - move_calls: _, - recipients: _, - } = indexed_checkpoint; - let checkpoint_seq = checkpoint.sequence_number; + } + drop(checkpoint_receiver_guard); + + let mut object_change_batch = vec![]; - // NOTE: commit object changes in the current task to stick to the original order, - // spawned tasks are possible to be executed in a different order. - let object_commit_timer = self.metrics.object_db_commit_latency.start_timer(); - let mut object_changes_commit_res = self + if object_changes_batch.is_empty() { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + continue; + } + // unwrap: seqs gets updated along with indexed_checkpoint_batch, hence must not be empty at this point + let last_checkpoint_seq = seqs.last().unwrap(); + let first_checkpoint_seq = seqs.first().unwrap(); + + if self.config.skip_db_commit { + info!( + "[Object] Downloaded and indexed checkpoint {} - {} successfully, skipping DB commit...", + last_checkpoint_seq, + first_checkpoint_seq, + ); + continue; + } + for object_changes in object_changes_batch { + object_change_batch.push(object_changes); + } + + // NOTE: commit object changes in the current task to stick to the original order, + // spawned tasks are possible to be executed in a different order. + let object_changes = object_change_batch + .into_iter() + .flatten() + .collect::>(); + let object_commit_timer = self.metrics.object_db_commit_latency.start_timer(); + let mut object_changes_commit_res = self + .state + .persist_object_changes( + &object_changes, + self.metrics.object_mutation_db_commit_latency.clone(), + self.metrics.object_deletion_db_commit_latency.clone(), + self.metrics.total_object_change_chunk_committed.clone(), + ) + .await; + while let Err(e) = object_changes_commit_res { + warn!( + "Indexer object changes commit failed with error: {:?}, retrying after {:?} milli-secs...", + e, DB_COMMIT_RETRY_INTERVAL_IN_MILLIS + ); + tokio::time::sleep(std::time::Duration::from_millis( + DB_COMMIT_RETRY_INTERVAL_IN_MILLIS, + )) + .await; + object_changes_commit_res = self .state .persist_object_changes( &object_changes, - self.metrics.total_object_change_chunk_committed.clone(), self.metrics.object_mutation_db_commit_latency.clone(), self.metrics.object_deletion_db_commit_latency.clone(), + self.metrics.total_object_change_chunk_committed.clone(), ) .await; - while let Err(e) = object_changes_commit_res { - warn!( - "Indexer object changes commit failed with error: {:?}, retrying after {:?} milli-secs...", - e, DB_COMMIT_RETRY_INTERVAL_IN_MILLIS - ); - tokio::time::sleep(std::time::Duration::from_millis( - DB_COMMIT_RETRY_INTERVAL_IN_MILLIS, - )) - .await; - object_changes_commit_res = self - .state - .persist_object_changes( - &object_changes, - self.metrics.total_object_change_chunk_committed.clone(), - self.metrics.object_mutation_db_commit_latency.clone(), - self.metrics.object_deletion_db_commit_latency.clone(), - ) - .await; - } - object_commit_timer.stop_and_record(); - self.metrics.total_object_checkpoint_committed.inc(); - self.metrics - .total_object_change_committed - .inc_by(object_changes.len() as u64); - self.metrics - .latest_indexer_object_checkpoint_sequence_number - .set(checkpoint_seq); - info!( - "Object checkpoint {} committed with {} object changes.", - checkpoint_seq, - object_changes.len(), - ); - } else { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; } + let elapsed = object_commit_timer.stop_and_record(); + self.metrics.total_object_checkpoint_committed.inc(); + self.metrics + .total_object_change_committed + .inc_by(object_changes.len() as u64); + self.metrics + .latest_indexer_object_checkpoint_sequence_number + .set(*last_checkpoint_seq as i64); + info!( + elapsed, + "Object Checkpoint {}-{} committed with {} object changes", + first_checkpoint_seq, + last_checkpoint_seq, + object_changes.len(), + ); } } @@ -654,6 +611,7 @@ where let epoch_db_guard = self.metrics.epoch_db_commit_latency.start_timer(); let mut epoch_commit_res = self.state.persist_epoch(&indexed_epoch).await; // NOTE: retrials are necessary here, otherwise indexed_epoch can be popped and discarded. + // FIXME: use macro to replace this pattern in this file. while let Err(e) = epoch_commit_res { warn!( "Indexer epoch commit failed with error: {:?}, retrying after {:?} milli-secs...", @@ -675,6 +633,7 @@ where } } + /// TODO: rewrite this thing. /// Download all the data we need for one checkpoint. async fn download_checkpoint_data( &self, @@ -758,8 +717,122 @@ where }) } + fn index_packages( + transactions: &[CheckpointTransactionBlockResponse], + changed_objects: &[(ObjectStatus, SuiObjectData)], + ) -> Result, IndexerError> { + let object_map = changed_objects + .iter() + .filter_map(|(_, o)| { + if let SuiRawData::Package(p) = &o + .bcs + .as_ref() + .expect("Expect the content field to be non-empty from data fetching") + { + Some((o.object_id, p)) + } else { + None + } + }) + .collect::>(); + + transactions + .iter() + .flat_map(|tx| { + tx.effects.created().iter().map(|oref| { + object_map + .get(&oref.reference.object_id) + .map(|o| Package::try_from(*tx.transaction.data.sender(), o)) + }) + }) + .flatten() + .collect() + } +} + +struct CheckpointProcessor +where + S: IndexerStore + Clone + Sync + Send + 'static, +{ + state: S, + metrics: IndexerMetrics, + epoch_sender: Arc>>, + checkpoint_sender: Arc>>, + downloaded_checkpoint_data_receiver: Receiver, +} + +impl CheckpointProcessor +where + S: IndexerStore + Clone + Sync + Send + 'static, +{ + async fn run(&mut self) -> Result<(), IndexerError> { + loop { + let checkpoint_data = self + .downloaded_checkpoint_data_receiver + .recv() + .await + .expect("Sender of Checkpoint Processor's rx should not be closed."); + + // Index checkpoint data + let index_timer = self.metrics.checkpoint_index_latency.start_timer(); + + let (checkpoint, epoch) = + Self::index_checkpoint_and_epoch(&self.state, &checkpoint_data) + .await + .tap_err(|e| { + error!( + "Failed to index checkpoints {:?} with error: {}", + checkpoint_data, + e.to_string() + ); + })?; + index_timer.stop_and_record(); + + // commit first epoch immediately, send other epochs to channel to be committed later. + if let Some(epoch) = epoch { + if epoch.last_epoch.is_none() { + let epoch_db_guard = self.metrics.epoch_db_commit_latency.start_timer(); + info!("Persisting first epoch..."); + let mut persist_first_epoch_res = self.state.persist_epoch(&epoch).await; + while persist_first_epoch_res.is_err() { + warn!("Failed to persist first epoch, retrying..."); + persist_first_epoch_res = self.state.persist_epoch(&epoch).await; + } + epoch_db_guard.stop_and_record(); + self.metrics.total_epoch_committed.inc(); + info!("Persisted first epoch"); + } else { + let epoch_sender_guard = self.epoch_sender.lock().await; + // NOTE: when the channel is full, epoch_sender_guard will wait until the channel has space. + epoch_sender_guard.send(epoch).await.map_err(|e| { + error!( + "Failed to send indexed epoch to epoch commit handler with error {}", + e.to_string() + ); + IndexerError::MpscChannelError(e.to_string()) + })?; + drop(epoch_sender_guard); + } + } + + let checkpoint_sender_guard = self.checkpoint_sender.lock().await; + // NOTE: when the channel is full, checkpoint_sender_guard will wait until the channel has space. + // Checkpoints are sent sequentially to stick to the order of checkpoint sequence numbers. + checkpoint_sender_guard + .send(checkpoint) + .await + .unwrap_or_else(|e| { + panic!( + "checkpoint channel send should not fail, but got error: {:?}", + e + ) + }); + drop(checkpoint_sender_guard); + } + } + async fn index_checkpoint_and_epoch( - &self, + state: &S, data: &CheckpointData, ) -> Result<(TemporaryCheckpointStore, Option), IndexerError> { let CheckpointData { @@ -782,48 +855,8 @@ where .flat_map(|tx| tx.events.data.iter().map(move |event| event.clone().into())) .collect::>(); - // Index objects - let tx_objects = changed_objects - .iter() - // Unwrap safe here as we requested previous tx data in the request. - .fold(BTreeMap::<_, Vec<_>>::new(), |mut acc, (status, o)| { - if let Some(digest) = &o.previous_transaction { - acc.entry(*digest).or_default().push((status, o)); - } - acc - }); - - let objects_changes = transactions - .iter() - .map(|tx| { - let changed_db_objects = tx_objects - .get(&tx.digest) - .unwrap_or(&vec![]) - .iter() - .map(|(status, o)| { - Object::from( - checkpoint.epoch, - Some(checkpoint.sequence_number), - status, - o, - ) - }) - .collect::>(); - let deleted_objects = get_deleted_db_objects( - &tx.effects, - checkpoint.epoch, - Some(checkpoint.sequence_number), - ); - - TransactionObjectChanges { - changed_objects: changed_db_objects, - deleted_objects, - } - }) - .collect(); - // Index packages - let packages = Self::index_packages(transactions, changed_objects)?; + let packages = CheckpointHandler::::index_packages(transactions, changed_objects)?; // Store input objects, move calls and recipients separately for transaction query indexing. let input_objects = transactions @@ -913,8 +946,7 @@ where let event = event.as_ref(); let last_epoch = system_state.epoch as i64 - 1; - let network_tx_count_prev_epoch = self - .state + let network_tx_count_prev_epoch = state .get_network_total_transactions_previous_epoch(last_epoch) .await?; Some(TemporaryEpochStore { @@ -978,7 +1010,6 @@ where )?, transactions: db_transactions, events, - object_changes: objects_changes, packages, input_objects, changed_objects, @@ -988,41 +1019,107 @@ where epoch_index, )) } +} - fn index_packages( - transactions: &[CheckpointTransactionBlockResponse], - changed_objects: &[(ObjectStatus, SuiObjectData)], - ) -> Result, IndexerError> { - let object_map = changed_objects +#[allow(clippy::type_complexity)] +struct CheckpointObjectsProcessor { + metrics: IndexerMetrics, + object_changes_sender: + Arc)>>>, + downloaded_object_data_receiver: Receiver, +} + +impl CheckpointObjectsProcessor { + async fn run(&mut self) -> Result<(), IndexerError> { + loop { + let checkpoint_data = self + .downloaded_object_data_receiver + .recv() + .await + .expect("Sender of Checkpoint Processor's rx should not be closed."); + let checkpoint_seq = checkpoint_data.checkpoint.sequence_number; + + // Index checkpoint data + let index_timer = self.metrics.checkpoint_index_latency.start_timer(); + + let object_changes = Self::index_checkpoint_objects(&checkpoint_data) + .await + .tap_err(|e| { + error!( + "Failed to index checkpoints {:?} with error: {}", + checkpoint_data, + e.to_string() + ); + })?; + index_timer.stop_and_record(); + + self.object_changes_sender + .lock() + .await + .send((checkpoint_seq, object_changes)) + .await + .unwrap_or_else(|e| { + panic!( + "checkpoint channel send should not fail, but got error: {:?}", + e + ) + }); + } + } + + async fn index_checkpoint_objects( + data: &CheckpointData, + ) -> Result, IndexerError> { + let CheckpointData { + checkpoint, + transactions, + changed_objects, + } = data; + + // Index objects + let tx_objects = changed_objects .iter() - .filter_map(|(_, o)| { - if let SuiRawData::Package(p) = &o - .bcs - .as_ref() - .expect("Expect the content field to be non-empty from data fetching") - { - Some((o.object_id, p)) - } else { - None + // Unwrap safe here as we requested previous tx data in the request. + .fold(BTreeMap::<_, Vec<_>>::new(), |mut acc, (status, o)| { + if let Some(digest) = &o.previous_transaction { + acc.entry(*digest).or_default().push((status, o)); } - }) - .collect::>(); + acc + }); - transactions + let objects_changes = transactions .iter() - .flat_map(|tx| { - tx.effects.created().iter().map(|oref| { - object_map - .get(&oref.reference.object_id) - .map(|o| Package::try_from(*tx.transaction.data.sender(), o)) - }) + .map(|tx| { + let changed_db_objects = tx_objects + .get(&tx.digest) + .unwrap_or(&vec![]) + .iter() + .map(|(status, o)| { + Object::from( + checkpoint.epoch, + Some(checkpoint.sequence_number), + status, + o, + ) + }) + .collect::>(); + let deleted_objects = get_deleted_db_objects( + &tx.effects, + checkpoint.epoch, + Some(checkpoint.sequence_number), + ); + + TransactionObjectChanges { + changed_objects: changed_db_objects, + deleted_objects, + } }) - .flatten() - .collect() + .collect(); + Ok(objects_changes) } } -// TODO(gegaowp): re-orgnize object util functions below +// TODO(gegaowp): re-organize object util functions below pub fn get_object_changes( effects: &SuiTransactionBlockEffects, ) -> Vec<(ObjectID, SequenceNumber, ObjectStatus)> { @@ -1092,18 +1189,6 @@ pub async fn fetch_changed_objects( }) } -// TODO(gegaowp): temp. disable fast-path -// pub fn to_changed_db_objects( -// changed_objects: Vec<(ObjectStatus, SuiObjectData)>, -// epoch: u64, -// checkpoint: Option, -// ) -> Vec { -// changed_objects -// .into_iter() -// .map(|(status, o)| Object::from(epoch, checkpoint.map(::from), &status, &o)) -// .collect::>() -// } - pub fn get_deleted_db_objects( effects: &SuiTransactionBlockEffects, epoch: EpochId, diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 5b2f8855a2faf..989c6c86c805c 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -20,7 +20,6 @@ pub struct IndexerMetrics { pub total_object_checkpoint_committed: IntCounter, pub total_transaction_committed: IntCounter, pub total_object_change_committed: IntCounter, - // NOTE: *_chunk_commited counts number of DB commits pub total_transaction_chunk_committed: IntCounter, pub total_object_change_chunk_committed: IntCounter, pub total_epoch_committed: IntCounter, @@ -30,10 +29,10 @@ pub struct IndexerMetrics { // checkpoint E2E latency is: // fullnode_download_latency + checkpoint_index_latency + db_commit_latency pub fullnode_checkpoint_wait_and_download_latency: Histogram, - pub fullnode_checkpoint_download_latency: Histogram, pub fullnode_transaction_download_latency: Histogram, pub fullnode_object_download_latency: Histogram, pub checkpoint_index_latency: Histogram, + pub checkpoint_objects_index_latency: Histogram, pub checkpoint_db_commit_latency: Histogram, pub object_db_commit_latency: Histogram, pub object_mutation_db_commit_latency: Histogram, @@ -141,13 +140,6 @@ impl IndexerMetrics { registry, ) .unwrap(), - fullnode_checkpoint_download_latency: register_histogram_with_registry!( - "fullnode_checkpoint_download_latency", - "Time spent in waiting for a new checkpoint from the Full Node", - LATENCY_SEC_BUCKETS.to_vec(), - registry, - ) - .unwrap(), fullnode_transaction_download_latency: register_histogram_with_registry!( "fullnode_transaction_download_latency", "Time spent in waiting for a new transaction from the Full Node", @@ -169,6 +161,13 @@ impl IndexerMetrics { registry, ) .unwrap(), + checkpoint_objects_index_latency: register_histogram_with_registry!( + "checkpoint_object_index_latency", + "Time spent in indexing a checkpoint objects", + LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), checkpoint_db_commit_latency: register_histogram_with_registry!( "checkpoint_db_commit_latency", "Time spent commiting a checkpoint to the db", diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index e2c72c248be0b..78105adf0785d 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -213,16 +213,16 @@ pub trait IndexerStore { async fn persist_checkpoint_transactions( &self, - checkpoint: &Checkpoint, + checkpoints: &[Checkpoint], transactions: &[Transaction], - total_transaction_chunk_committed_counter: IntCounter, - ) -> Result; + counter_committed_tx: IntCounter, + ) -> Result<(), IndexerError>; async fn persist_object_changes( &self, tx_object_changes: &[TransactionObjectChanges], - total_object_change_chunk_committed_counter: IntCounter, object_mutation_latency: Histogram, object_deletion_latency: Histogram, + counter_committed_object: IntCounter, ) -> Result<(), IndexerError>; async fn persist_events(&self, events: &[Event]) -> Result<(), IndexerError>; async fn persist_addresses( @@ -338,11 +338,11 @@ impl ObjectStore for CheckpointData { } // Per checkpoint indexing +#[derive(Debug)] pub struct TemporaryCheckpointStore { pub checkpoint: Checkpoint, pub transactions: Vec, pub events: Vec, - pub object_changes: Vec, pub packages: Vec, pub input_objects: Vec, pub changed_objects: Vec, diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 566166b4ffbb8..a80da09f1d5b1 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -1230,10 +1230,10 @@ impl PgIndexerStore { fn persist_checkpoint_transactions( &self, - checkpoint: &Checkpoint, + checkpoints: &[Checkpoint], transactions: &[Transaction], - total_transaction_chunk_committed_counter: IntCounter, - ) -> Result { + counter_committed_tx: IntCounter, + ) -> Result<(), IndexerError> { transactional_blocking!(&self.blocking_cp, |conn| { // Commit indexed transactions for transaction_chunk in transactions.chunks(PG_COMMIT_CHUNK_SIZE) { @@ -1249,26 +1249,30 @@ impl PgIndexerStore { .execute(conn) .map_err(IndexerError::from) .context("Failed writing transactions to PostgresDB")?; - total_transaction_chunk_committed_counter.inc(); + counter_committed_tx.inc(); } // Commit indexed checkpoint last, so that if the checkpoint is committed, // all related data have been committed as well. - diesel::insert_into(checkpoints::table) - .values(checkpoint) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed writing checkpoint to PostgresDB") + for checkpoint_chunk in checkpoints.chunks(PG_COMMIT_CHUNK_SIZE) { + diesel::insert_into(checkpoints::table) + .values(checkpoint_chunk) + .on_conflict_do_nothing() + .execute(conn) + .map_err(IndexerError::from) + .context("Failed writing checkpoint to PostgresDB")?; + counter_committed_tx.inc(); + } + Ok::<(), IndexerError>(()) }) } fn persist_object_changes( &self, tx_object_changes: &[TransactionObjectChanges], - total_object_change_chunk_committed_counter: IntCounter, object_mutation_latency: Histogram, object_deletion_latency: Histogram, + counter_committed_object: IntCounter, ) -> Result<(), IndexerError> { transactional_blocking!(&self.blocking_cp, |conn| { let mutated_objects: Vec = tx_object_changes @@ -1287,9 +1291,9 @@ impl PgIndexerStore { conn, mutated_objects, deleted_objects, - total_object_change_chunk_committed_counter, object_mutation_latency, object_deletion_latency, + counter_committed_object, )?; Ok::<(), IndexerError>(()) })?; @@ -2237,18 +2241,14 @@ impl IndexerStore for PgIndexerStore { async fn persist_checkpoint_transactions( &self, - checkpoint: &Checkpoint, + checkpoints: &[Checkpoint], transactions: &[Transaction], - total_transaction_chunk_committed_counter: IntCounter, - ) -> Result { - let checkpoint = checkpoint.to_owned(); + counter_committed_tx: IntCounter, + ) -> Result<(), IndexerError> { + let checkpoints = checkpoints.to_owned(); let transactions = transactions.to_owned(); self.spawn_blocking(move |this| { - this.persist_checkpoint_transactions( - &checkpoint, - &transactions, - total_transaction_chunk_committed_counter, - ) + this.persist_checkpoint_transactions(&checkpoints, &transactions, counter_committed_tx) }) .await } @@ -2256,17 +2256,17 @@ impl IndexerStore for PgIndexerStore { async fn persist_object_changes( &self, tx_object_changes: &[TransactionObjectChanges], - total_object_change_chunk_committed_counter: IntCounter, object_mutation_latency: Histogram, object_deletion_latency: Histogram, + counter_committed_object: IntCounter, ) -> Result<(), IndexerError> { let tx_object_changes = tx_object_changes.to_owned(); self.spawn_blocking(move |this| { this.persist_object_changes( &tx_object_changes, - total_object_change_chunk_committed_counter, object_mutation_latency, object_deletion_latency, + counter_committed_object, ) }) .await @@ -2444,9 +2444,9 @@ fn persist_transaction_object_changes( conn: &mut PgConnection, mutated_objects: Vec, deleted_objects: Vec, - object_commit_counter: IntCounter, object_mutation_latency: Histogram, object_deletion_latency: Histogram, + committed_object_counter: IntCounter, ) -> Result<(), IndexerError> { // NOTE: to avoid error of `ON CONFLICT DO UPDATE command cannot affect row a second time`, // we have to limit update of one object once in a query. @@ -2465,7 +2465,6 @@ fn persist_transaction_object_changes( e )) })?; - object_commit_counter.inc(); object_mutation_guard.stop_and_record(); let object_deletion_guard = object_deletion_latency.start_timer(); @@ -2488,7 +2487,7 @@ fn persist_transaction_object_changes( e )) })?; - object_commit_counter.inc(); + committed_object_counter.inc(); } object_deletion_guard.stop_and_record();