diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index fce20707677b5..08a7ec2b01554 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; -use anyhow::{Context, Result}; +use anyhow::{ensure, Context, Result}; use db::{Db, DbConfig}; use ingestion::{IngestionConfig, IngestionService}; use metrics::{IndexerMetrics, MetricsService}; @@ -146,6 +146,13 @@ impl Indexer { return Ok(()); }; + // For a concurrent pipeline, if skip_watermark is set, we don't really care about the + // watermark consistency. first_checkpoint can be anything since we don't update watermark, + // and writes should be idempotent. + if !self.pipeline_config.skip_watermark { + self.check_first_checkpoint_consistency::(&watermark)?; + } + let (processor, collector, committer, watermark) = concurrent::pipeline::( watermark, self.pipeline_config.clone(), @@ -176,6 +183,10 @@ impl Indexer { return Ok(()); }; + // For a sequential pipeline, data must be written in the order of checkpoints. + // Hence, we do not allow the first_checkpoint override to be in arbitrary positions. + self.check_first_checkpoint_consistency::(&watermark)?; + let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe(); let (processor, committer) = sequential::pipeline::( @@ -194,6 +205,27 @@ impl Indexer { Ok(()) } + /// Checks that the first checkpoint override is consistent with the watermark for the pipeline. + /// If the watermark does not exist, the override can be anything. If the watermark exists, the + /// override must not leave any gap in the data: it can be in the past, or at the tip of the + /// network, but not in the future. + fn check_first_checkpoint_consistency( + &self, + watermark: &Option, + ) -> Result<()> { + if let (Some(watermark), Some(first_checkpoint)) = (watermark, self.first_checkpoint) { + ensure!( + first_checkpoint as i64 <= watermark.checkpoint_hi_inclusive + 1, + "For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. This could create gaps in the data.", + P::NAME, + first_checkpoint, + watermark.checkpoint_hi_inclusive, + ); + } + + Ok(()) + } + /// Start ingesting checkpoints. Ingestion either starts from the configured /// `first_checkpoint`, or it is calculated based on the watermarks of all active pipelines. /// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided, diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/watermark.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/watermark.rs index 481a581b06ec7..69b58e266f69c 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/watermark.rs +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/watermark.rs @@ -179,6 +179,8 @@ pub(super) fn watermark( .with_label_values(&[H::NAME]) .start_timer(); + // TODO: If initial_watermark is empty, when we update watermark + // for the first time, we should also update the low watermark. match watermark.update(&mut conn).await { // If there's an issue updating the watermark, log it but keep going, // it's OK for the watermark to lag from a correctness perspective. diff --git a/crates/sui-indexer-alt/src/pipeline/mod.rs b/crates/sui-indexer-alt/src/pipeline/mod.rs index 576e308c61b0f..05949d524832a 100644 --- a/crates/sui-indexer-alt/src/pipeline/mod.rs +++ b/crates/sui-indexer-alt/src/pipeline/mod.rs @@ -54,7 +54,7 @@ pub struct PipelineConfig { /// Avoid writing to the watermark table #[arg(long)] - skip_watermark: bool, + pub skip_watermark: bool, } /// Processed values associated with a single checkpoint. This is an internal type used to diff --git a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs b/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs index 3bd06518c9c10..663383fc8d1d9 100644 --- a/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs +++ b/crates/sui-indexer-alt/src/pipeline/sequential/committer.rs @@ -182,6 +182,8 @@ pub(super) fn committer( // single transaction. The handler's `commit` implementation is responsible for // chunking up the writes into a manageable size. let affected = conn.transaction::<_, anyhow::Error, _>(|conn| async { + // TODO: If initial_watermark is empty, when we update watermark + // for the first time, we should also update the low watermark. watermark.update(conn).await?; H::commit(&batch, conn).await }.scope_boxed()).await;