From 2616286c377b61affc9b7bf8ce9f391176f44dbd Mon Sep 17 00:00:00 2001 From: Lu Zhang <8418040+longbowlu@users.noreply.github.com> Date: Sat, 7 Sep 2024 16:13:10 -0700 Subject: [PATCH] [bridge-indexer] Integrate progress saving policy (#19244) --- .../src/eth_bridge_indexer.rs | 8 -- crates/sui-bridge-indexer/src/main.rs | 24 ++++- crates/sui-bridge-indexer/src/metrics.rs | 8 +- crates/sui-bridge-indexer/src/storage.rs | 74 ++++++++++----- .../sui-bridge-indexer/src/sui_datasource.rs | 14 ++- .../src/indexer_builder.rs | 93 +++++++++++++------ .../tests/indexer_test_utils.rs | 14 ++- 7 files changed, 153 insertions(+), 82 deletions(-) diff --git a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs index 7b00e3eeaa877..409b787aafafe 100644 --- a/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/eth_bridge_indexer.rs @@ -154,10 +154,6 @@ impl Datasource for EthSubscriptionDatasource { fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { &self.indexer_metrics.tasks_processed_checkpoints } - - fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { - &self.indexer_metrics.live_task_current_checkpoint - } } pub struct EthSyncDatasource { @@ -288,10 +284,6 @@ impl Datasource for EthSyncDatasource { fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { &self.indexer_metrics.tasks_processed_checkpoints } - - fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { - &self.indexer_metrics.live_task_current_checkpoint - } } #[derive(Clone)] diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 3ad53b1d02d31..82d875439aa9f 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -24,7 +24,10 @@ use sui_bridge_indexer::config::IndexerConfig; use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper; use sui_bridge_indexer::metrics::BridgeIndexerMetrics; use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store}; -use sui_bridge_indexer::storage::PgBridgePersistent; +use sui_bridge_indexer::storage::{ + OutOfOrderSaveAfterDurationPolicy, PgBridgePersistent, ProgressSavingPolicy, + SaveAfterDurationPolicy, +}; use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper; use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource; use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop; @@ -72,7 +75,20 @@ async fn main() -> Result<()> { let bridge_metrics = Arc::new(BridgeMetrics::new(®istry)); let db_url = config.db_url.clone(); - let datastore = PgBridgePersistent::new(get_connection_pool(db_url.clone()).await); + let datastore = PgBridgePersistent::new( + get_connection_pool(db_url.clone()).await, + ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new( + tokio::time::Duration::from_secs(30), + )), + indexer_meterics.clone(), + ); + let datastore_with_out_of_order_source = PgBridgePersistent::new( + get_connection_pool(db_url.clone()).await, + ProgressSavingPolicy::OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy::new( + tokio::time::Duration::from_secs(30), + )), + indexer_meterics.clone(), + ); let eth_client: Arc> = Arc::new( EthClient::::new( @@ -119,7 +135,7 @@ async fn main() -> Result<()> { EthDataMapper { metrics: indexer_meterics.clone(), }, - datastore.clone(), + datastore, ) .with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 }) .disable_live_task() @@ -146,7 +162,7 @@ async fn main() -> Result<()> { SuiBridgeDataMapper { metrics: indexer_meterics.clone(), }, - datastore, + datastore_with_out_of_order_source, ) .build(); indexer.start().await?; diff --git a/crates/sui-bridge-indexer/src/metrics.rs b/crates/sui-bridge-indexer/src/metrics.rs index 9ea85514e20d3..ef0c93edc6a41 100644 --- a/crates/sui-bridge-indexer/src/metrics.rs +++ b/crates/sui-bridge-indexer/src/metrics.rs @@ -23,7 +23,7 @@ pub struct BridgeIndexerMetrics { pub(crate) last_synced_eth_block: IntGauge, pub(crate) tasks_remaining_checkpoints: IntGaugeVec, pub(crate) tasks_processed_checkpoints: IntCounterVec, - pub(crate) live_task_current_checkpoint: IntGaugeVec, + pub(crate) tasks_current_checkpoints: IntGaugeVec, } impl BridgeIndexerMetrics { @@ -115,9 +115,9 @@ impl BridgeIndexerMetrics { registry, ) .unwrap(), - live_task_current_checkpoint: register_int_gauge_vec_with_registry!( - "bridge_indexer_live_task_current_checkpoint", - "Current checkpoint of live task", + tasks_current_checkpoints: register_int_gauge_vec_with_registry!( + "bridge_indexer_tasks_current_checkpoints", + "Current checkpoint for each task", &["task_name"], registry, ) diff --git a/crates/sui-bridge-indexer/src/storage.rs b/crates/sui-bridge-indexer/src/storage.rs index c39d2079d8f65..2ab775d2c5fa2 100644 --- a/crates/sui-bridge-indexer/src/storage.rs +++ b/crates/sui-bridge-indexer/src/storage.rs @@ -15,6 +15,7 @@ use diesel_async::scoped_futures::ScopedFutureExt; use diesel_async::AsyncConnection; use diesel_async::RunQueryDsl; +use crate::metrics::BridgeIndexerMetrics; use crate::postgres_manager::PgPool; use crate::schema::progress_store::{columns, dsl}; use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data}; @@ -26,11 +27,21 @@ use sui_indexer_builder::Task; #[derive(Clone)] pub struct PgBridgePersistent { pool: PgPool, + save_progress_policy: ProgressSavingPolicy, + indexer_metrics: BridgeIndexerMetrics, } impl PgBridgePersistent { - pub fn new(pool: PgPool) -> Self { - Self { pool } + pub fn new( + pool: PgPool, + save_progress_policy: ProgressSavingPolicy, + indexer_metrics: BridgeIndexerMetrics, + ) -> Self { + Self { + pool, + save_progress_policy, + indexer_metrics, + } } } @@ -96,27 +107,44 @@ impl IndexerProgressStore for PgBridgePersistent { async fn save_progress( &mut self, task_name: String, - checkpoint_number: u64, - ) -> anyhow::Result<()> { - let mut conn = self.pool.get().await?; - diesel::insert_into(schema::progress_store::table) - .values(&models::ProgressStore { - task_name, - checkpoint: checkpoint_number as i64, - // Target checkpoint and timestamp will only be written for new entries - target_checkpoint: i64::MAX, - // Timestamp is defaulted to current time in DB if None - timestamp: None, - }) - .on_conflict(dsl::task_name) - .do_update() - .set(( - columns::checkpoint.eq(checkpoint_number as i64), - columns::timestamp.eq(now), - )) - .execute(&mut conn) - .await?; - Ok(()) + checkpoint_numbers: &[u64], + start_checkpoint_number: u64, + target_checkpoint_number: u64, + ) -> anyhow::Result> { + if checkpoint_numbers.is_empty() { + return Ok(None); + } + if let Some(checkpoint_to_save) = self.save_progress_policy.cache_progress( + task_name.clone(), + checkpoint_numbers, + start_checkpoint_number, + target_checkpoint_number, + ) { + let mut conn = self.pool.get().await?; + diesel::insert_into(schema::progress_store::table) + .values(&models::ProgressStore { + task_name: task_name.clone(), + checkpoint: checkpoint_to_save as i64, + // Target checkpoint and timestamp will only be written for new entries + target_checkpoint: i64::MAX, + // Timestamp is defaulted to current time in DB if None + timestamp: None, + }) + .on_conflict(dsl::task_name) + .do_update() + .set(( + columns::checkpoint.eq(checkpoint_to_save as i64), + columns::timestamp.eq(now), + )) + .execute(&mut conn) + .await?; + self.indexer_metrics + .tasks_current_checkpoints + .with_label_values(&[&task_name]) + .set(checkpoint_to_save as i64); + return Ok(Some(checkpoint_to_save)); + } + Ok(None) } async fn get_ongoing_tasks(&self, prefix: &str) -> Result, anyhow::Error> { diff --git a/crates/sui-bridge-indexer/src/sui_datasource.rs b/crates/sui-bridge-indexer/src/sui_datasource.rs index a9d5de0fac774..fe8891897f36f 100644 --- a/crates/sui-bridge-indexer/src/sui_datasource.rs +++ b/crates/sui-bridge-indexer/src/sui_datasource.rs @@ -111,10 +111,6 @@ impl Datasource for SuiCheckpointDatasource { fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec { &self.indexer_metrics.tasks_processed_checkpoints } - - fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { - &self.indexer_metrics.live_task_current_checkpoint - } } struct PerTaskInMemProgressStore { @@ -134,11 +130,19 @@ impl ProgressStore for PerTaskInMemProgressStore { async fn save( &mut self, - _task_name: String, + task_name: String, checkpoint_number: CheckpointSequenceNumber, ) -> anyhow::Result<()> { if checkpoint_number >= self.exit_checkpoint { + tracing::info!( + task_name, + checkpoint_number, + exit_checkpoint = self.exit_checkpoint, + "Task completed, sending exit signal" + ); + // `exit_sender` may be `None` if we have already sent the exit signal. if let Some(sender) = self.exit_sender.take() { + // Ignore the error if the receiver has already been dropped. let _ = sender.send(()); } } diff --git a/crates/sui-indexer-builder/src/indexer_builder.rs b/crates/sui-indexer-builder/src/indexer_builder.rs index f37cab7d58c3c..8f8eef1dcf7d9 100644 --- a/crates/sui-indexer-builder/src/indexer_builder.rs +++ b/crates/sui-indexer-builder/src/indexer_builder.rs @@ -18,6 +18,7 @@ type CheckpointData = (u64, Vec); pub type DataSender = metered_channel::Sender>; const INGESTION_BATCH_SIZE: usize = 100; +const RETRIEVED_CHECKPOINT_CHANNEL_SIZE: usize = 10000; pub struct IndexerBuilder { name: String, @@ -305,11 +306,17 @@ pub trait Persistent: IndexerProgressStore + Sync + Send + Clone { #[async_trait] pub trait IndexerProgressStore: Send { async fn load_progress(&self, task_name: String) -> anyhow::Result; + /// Attempt to save progress. Depending on the `ProgressSavingPolicy`, + /// the progress may be cached somewhere instead of flushing to persistent storage. + /// Returns saved checkpoint number if any. Caller can use this value as a signal + /// to see if we have reached the target checkpoint. async fn save_progress( &mut self, task_name: String, - checkpoint_number: u64, - ) -> anyhow::Result<()>; + checkpoint_numbers: &[u64], + start_checkpoint_number: u64, + target_checkpoint_number: u64, + ) -> anyhow::Result>; async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result, Error>; @@ -346,16 +353,21 @@ pub trait Datasource: Sync + Send { .unwrap_or(INGESTION_BATCH_SIZE.to_string()) .parse::() .unwrap(); + let checkpoint_channel_size = std::env::var("RETRIEVED_CHECKPOINT_CHANNEL_SIZE") + .unwrap_or(RETRIEVED_CHECKPOINT_CHANNEL_SIZE.to_string()) + .parse::() + .unwrap(); tracing::info!( task_name, ingestion_batch_size, + checkpoint_channel_size, "Starting ingestion task ({}-{})", starting_checkpoint, target_checkpoint, ); let is_live_task = target_checkpoint == i64::MAX as u64; let (data_sender, data_rx) = metered_channel::channel( - 1000, + checkpoint_channel_size, &mysten_metrics::get_metrics() .unwrap() .channel_inflight @@ -378,26 +390,16 @@ pub trait Datasource: Sync + Send { } else { None }; - // track current checkpoint for live task - let live_task_current_checkpoint_metrics = if is_live_task { - let m = self - .get_live_task_checkpoint_metric() - .with_label_values(&[&task_name]); - m.set((starting_checkpoint) as i64); - Some(m) - } else { - None - }; let mut stream = mysten_metrics::metered_channel::ReceiverStream::new(data_rx) .ready_chunks(ingestion_batch_size); - + let mut last_saved_checkpoint = None; while let Some(batch) = stream.next().await { - // unwrap safe: at least 1 element in the batch let mut max_height = 0; + let mut heights = vec![]; let mut data = vec![]; - let mut batch_size = 0; for (height, d) in batch { + // Filter out data with height > target_checkpoint, in case data source returns any if height > target_checkpoint { tracing::warn!( task_name, @@ -407,36 +409,59 @@ pub trait Datasource: Sync + Send { continue; } max_height = std::cmp::max(max_height, height); - batch_size += 1; + heights.push(height); data.extend(d); } tracing::debug!( task_name, max_height, "Ingestion task received {} blocks.", - batch_size, + heights.len(), ); let timer = tokio::time::Instant::now(); if !data.is_empty() { + let timer = tokio::time::Instant::now(); let processed_data = data.into_iter().try_fold(vec![], |mut result, d| { result.append(&mut data_mapper.map(d)?); Ok::, Error>(result) })?; + tracing::debug!( + task_name, + max_height, + "Data mapper processed {} blocks in {}ms.", + heights.len(), + timer.elapsed().as_millis(), + ); + let timer = tokio::time::Instant::now(); // TODO: batch write data // TODO: we might be able to write data and progress in a single transaction. storage.write(processed_data).await?; + tracing::debug!( + task_name, + max_height, + "Processed data ({} blocks) was wrote to storage in {}ms.", + heights.len(), + timer.elapsed().as_millis(), + ); } - // TODO: batch progress - storage.save_progress(task_name.clone(), max_height).await?; + last_saved_checkpoint = storage + .save_progress( + task_name.clone(), + &heights, + starting_checkpoint, + target_checkpoint, + ) + .await?; tracing::debug!( task_name, max_height, + last_saved_checkpoint, "Ingestion task processed {} blocks in {}ms", - batch_size, + heights.len(), timer.elapsed().as_millis(), ); - processed_checkpoints_metrics.inc_by(batch_size as u64); + processed_checkpoints_metrics.inc_by(heights.len() as u64); if let Some(m) = &remaining_checkpoints_metric { // Note this is only approximate as the data may come in out of order m.set(std::cmp::max( @@ -444,16 +469,26 @@ pub trait Datasource: Sync + Send { 0, )); } - if let Some(m) = &live_task_current_checkpoint_metrics { - m.set((max_height) as i64) - } - if max_height > target_checkpoint { - break; + // If we have reached the target checkpoint, exit proactively + if let Some(cp) = last_saved_checkpoint { + if cp >= target_checkpoint { + // Task is done + break; + } } } if is_live_task { // Live task should never exit, except in unit tests - tracing::error!(task_name, "Live task exiting"); + tracing::error!(task_name, "Live task exiting unexpectedly"); + } else if last_saved_checkpoint.expect("last_saved_checkpoint is None") < target_checkpoint + { + tracing::error!( + task_name, + last_saved_checkpoint, + "Task exiting before reaching target checkpoint", + ); + } else { + tracing::info!(task_name, "Backfill task is done, exiting"); } join_handle.abort(); if let Some(m) = &remaining_checkpoints_metric { @@ -476,8 +511,6 @@ pub trait Datasource: Sync + Send { fn get_tasks_remaining_checkpoints_metric(&self) -> &IntGaugeVec; fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec; - - fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec; } pub enum BackfillStrategy { diff --git a/crates/sui-indexer-builder/tests/indexer_test_utils.rs b/crates/sui-indexer-builder/tests/indexer_test_utils.rs index 0f9040ca93b4c..4921fdeb2fc53 100644 --- a/crates/sui-indexer-builder/tests/indexer_test_utils.rs +++ b/crates/sui-indexer-builder/tests/indexer_test_utils.rs @@ -68,11 +68,6 @@ where // This is dummy &self.counter_metric } - - fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec { - // This is dummy - &self.gauge_metric - } } #[derive(Clone, Debug, Default)] @@ -119,15 +114,18 @@ impl IndexerProgressStore for InMemoryPersistent { async fn save_progress( &mut self, task_name: String, - checkpoint_number: u64, - ) -> anyhow::Result<()> { + checkpoint_numbers: &[u64], + _start_checkpoint_number: u64, + _target_checkpoint_number: u64, + ) -> anyhow::Result> { + let checkpoint_number = *checkpoint_numbers.last().unwrap(); self.progress_store .lock() .await .get_mut(&task_name) .unwrap() .checkpoint = checkpoint_number; - Ok(()) + Ok(Some(checkpoint_number)) } async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result, Error> {