Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer-alt] Fix first_checkpoint option #20079

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};

use anyhow::{Context, Result};
use anyhow::{ensure, Context, Result};
use db::{Db, DbConfig};
use ingestion::{IngestionConfig, IngestionService};
use metrics::{IndexerMetrics, MetricsService};
Expand Down Expand Up @@ -146,6 +146,13 @@ impl Indexer {
return Ok(());
};

// For a concurrent pipeline, if skip_watermark is set, we don't really care about the
// watermark consistency. first_checkpoint can be anything since we don't update watermark,
// and writes should be idempotent.
if !self.pipeline_config.skip_watermark {
self.check_first_checkpoint_consistency::<H>(&watermark)?;
}

let (processor, collector, committer, watermark) = concurrent::pipeline::<H>(
watermark,
self.pipeline_config.clone(),
Expand Down Expand Up @@ -176,6 +183,10 @@ impl Indexer {
return Ok(());
};

// For a sequential pipeline, data must be written in the order of checkpoints.
// Hence, we do not allow the first_checkpoint override to be in arbitrary positions.
self.check_first_checkpoint_consistency::<H>(&watermark)?;

let (checkpoint_rx, watermark_tx) = self.ingestion_service.subscribe();

let (processor, committer) = sequential::pipeline::<H>(
Expand All @@ -194,6 +205,27 @@ impl Indexer {
Ok(())
}

/// Checks that the first checkpoint override is consistent with the watermark for the pipeline.
/// If the watermark does not exist, the override can be anything. If the watermark exists, the
/// override must not leave any gap in the data: it can be in the past, or at the tip of the
/// network, but not in the future.
fn check_first_checkpoint_consistency<P: Processor>(
&self,
watermark: &Option<CommitterWatermark>,
) -> Result<()> {
if let (Some(watermark), Some(first_checkpoint)) = (watermark, self.first_checkpoint) {
ensure!(
first_checkpoint as i64 <= watermark.checkpoint_hi_inclusive + 1,
"For pipeline {}, first checkpoint override {} is too far ahead of watermark {}. This could create gaps in the data.",
P::NAME,
first_checkpoint,
watermark.checkpoint_hi_inclusive,
);
}

Ok(())
}

/// Start ingesting checkpoints. Ingestion either starts from the configured
/// `first_checkpoint`, or it is calculated based on the watermarks of all active pipelines.
/// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided,
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/concurrent/watermark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ pub(super) fn watermark<H: Handler + 'static>(
.with_label_values(&[H::NAME])
.start_timer();

// TODO: If initial_watermark is empty, when we update watermark
// for the first time, we should also update the low watermark.
match watermark.update(&mut conn).await {
// If there's an issue updating the watermark, log it but keep going,
// it's OK for the watermark to lag from a correctness perspective.
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct PipelineConfig {

/// Avoid writing to the watermark table
#[arg(long)]
skip_watermark: bool,
pub skip_watermark: bool,
}

/// Processed values associated with a single checkpoint. This is an internal type used to
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/sequential/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ pub(super) fn committer<H: Handler + 'static>(
// single transaction. The handler's `commit` implementation is responsible for
// chunking up the writes into a manageable size.
let affected = conn.transaction::<_, anyhow::Error, _>(|conn| async {
// TODO: If initial_watermark is empty, when we update watermark
// for the first time, we should also update the low watermark.
watermark.update(conn).await?;
H::commit(&batch, conn).await
}.scope_boxed()).await;
Expand Down
Loading