Skip to content

Commit

Permalink
[Indexer] Break dependency between pruner and epoch watermark (#19535)
Browse files Browse the repository at this point in the history
## Description 

This PR makes a few improvements to the pruner:
1. It stops depending on the epochs table data range as the watermark
for global pruning decisions. Instead, it keeps track of the next prune
epoch and last seen max epoch locally in the task. Once we have the
actual watermark table we probably could further improve it.
2. It also fixes an issue where we ignore failed pruning error.

## Test plan 

Unfortunately we don't have any tests for pruner.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Sep 25, 2024
1 parent f0591bd commit 857de33
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 30 deletions.
55 changes: 26 additions & 29 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::collections::HashMap;
use std::time::Duration;

use tokio_util::sync::CancellationToken;
use tracing::{error, info};

Expand Down Expand Up @@ -35,21 +34,16 @@ impl Pruner {
}

pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
loop {
if cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}

let (mut min_epoch, mut max_epoch) = self.store.get_available_epoch_range().await?;
while min_epoch + self.epochs_to_keep > max_epoch {
if cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}
let mut last_seen_max_epoch = 0;
// The first epoch that has not yet been pruned.
let mut next_prune_epoch = None;
while !cancel.is_cancelled() {
let (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
if max_epoch == last_seen_max_epoch {
tokio::time::sleep(Duration::from_secs(5)).await;
(min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
continue;
}
last_seen_max_epoch = max_epoch;

// Not all partitioned tables are epoch-partitioned, so we need to filter them out.
let table_partitions: HashMap<_, _> = self
Expand All @@ -64,16 +58,20 @@ impl Pruner {
})
.collect();

let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.epochs_to_keep - 1);

for (table_name, (min_partition, max_partition)) in &table_partitions {
if max_epoch != *max_partition {
if last_seen_max_epoch != *max_partition {
error!(
"Epochs are out of sync for table {}: max_epoch={}, max_partition={}",
table_name, max_epoch, max_partition
table_name, last_seen_max_epoch, max_partition
);
}
// drop partitions if pruning is enabled afterwards, where all epochs before min_epoch
// would have been pruned already if the pruner was running.
for epoch in *min_partition..min_epoch {
for epoch in *min_partition..prune_to_epoch {
if cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}
self.partition_manager
.drop_table_partition(table_name.clone(), epoch)
.await?;
Expand All @@ -84,24 +82,23 @@ impl Pruner {
}
}

for epoch in min_epoch..max_epoch.saturating_sub(self.epochs_to_keep - 1) {
let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch);
for epoch in prune_start_epoch..prune_to_epoch {
if cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}
info!("Pruning epoch {}", epoch);
for table_name in table_partitions.keys() {
self.partition_manager
.drop_table_partition(table_name.clone(), epoch)
.await?;
info!("Dropped table partition {} epoch {}", table_name, epoch);
}
self.store.prune_epoch(epoch).await.unwrap_or_else(|e| {
error!("Failed to prune epoch {}: {}", epoch, e);
});
if let Err(err) = self.store.prune_epoch(epoch).await {
error!("Failed to prune epoch {}: {}", epoch, err);
break;
};
self.metrics.last_pruned_epoch.set(epoch as i64);
info!("Pruned epoch {}", epoch);
next_prune_epoch = Some(epoch + 1);
}
}
info!("Pruner task cancelled.");
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use crate::store::{IndexerStore, PgIndexerStore};
pub struct Indexer;

impl Indexer {
pub async fn start_writer(
#[cfg(test)]
pub async fn start_writer_for_testing(
config: &IngestionConfig,
store: PgIndexerStore,
metrics: IndexerMetrics,
Expand Down

0 comments on commit 857de33

Please sign in to comment.