diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index 21dca59607f9d..f5ec89a2fa627 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use std::time::Duration; - use tokio_util::sync::CancellationToken; use tracing::{error, info}; @@ -35,21 +34,16 @@ impl Pruner { } pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> { - loop { - if cancel.is_cancelled() { - info!("Pruner task cancelled."); - return Ok(()); - } - - let (mut min_epoch, mut max_epoch) = self.store.get_available_epoch_range().await?; - while min_epoch + self.epochs_to_keep > max_epoch { - if cancel.is_cancelled() { - info!("Pruner task cancelled."); - return Ok(()); - } + let mut last_seen_max_epoch = 0; + // The first epoch that has not yet been pruned. + let mut next_prune_epoch = None; + while !cancel.is_cancelled() { + let (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?; + if max_epoch == last_seen_max_epoch { tokio::time::sleep(Duration::from_secs(5)).await; - (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?; + continue; } + last_seen_max_epoch = max_epoch; // Not all partitioned tables are epoch-partitioned, so we need to filter them out. let table_partitions: HashMap<_, _> = self @@ -64,16 +58,20 @@ impl Pruner { }) .collect(); + let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.epochs_to_keep - 1); + for (table_name, (min_partition, max_partition)) in &table_partitions { - if max_epoch != *max_partition { + if last_seen_max_epoch != *max_partition { error!( "Epochs are out of sync for table {}: max_epoch={}, max_partition={}", - table_name, max_epoch, max_partition + table_name, last_seen_max_epoch, max_partition ); } - // drop partitions if pruning is enabled afterwards, where all epochs before min_epoch - // would have been pruned already if the pruner was running. - for epoch in *min_partition..min_epoch { + for epoch in *min_partition..prune_to_epoch { + if cancel.is_cancelled() { + info!("Pruner task cancelled."); + return Ok(()); + } self.partition_manager .drop_table_partition(table_name.clone(), epoch) .await?; @@ -84,24 +82,23 @@ impl Pruner { } } - for epoch in min_epoch..max_epoch.saturating_sub(self.epochs_to_keep - 1) { + let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch); + for epoch in prune_start_epoch..prune_to_epoch { if cancel.is_cancelled() { info!("Pruner task cancelled."); return Ok(()); } info!("Pruning epoch {}", epoch); - for table_name in table_partitions.keys() { - self.partition_manager - .drop_table_partition(table_name.clone(), epoch) - .await?; - info!("Dropped table partition {} epoch {}", table_name, epoch); - } - self.store.prune_epoch(epoch).await.unwrap_or_else(|e| { - error!("Failed to prune epoch {}: {}", epoch, e); - }); + if let Err(err) = self.store.prune_epoch(epoch).await { + error!("Failed to prune epoch {}: {}", epoch, err); + break; + }; self.metrics.last_pruned_epoch.set(epoch as i64); info!("Pruned epoch {}", epoch); + next_prune_epoch = Some(epoch + 1); } } + info!("Pruner task cancelled."); + Ok(()) } } diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index ab269d31c5eb8..6bdf400758acc 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -32,7 +32,8 @@ use crate::store::{IndexerStore, PgIndexerStore}; pub struct Indexer; impl Indexer { - pub async fn start_writer( + #[cfg(test)] + pub async fn start_writer_for_testing( config: &IngestionConfig, store: PgIndexerStore, metrics: IndexerMetrics,