Skip to content

Commit

Permalink
indexer: exclude deleted & wrapped objects from snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Oct 9, 2024
1 parent 2c421c3 commit 9de3f8e
Showing 1 changed file with 100 additions and 20 deletions.
120 changes: 100 additions & 20 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,9 @@ impl PgIndexerStore {
})
}

async fn persist_objects_snapshot_chunk(
async fn persist_object_snapshot_mutation_chunk(
&self,
objects_snapshot: Vec<StoredObjectSnapshot>,
objects_snapshot_mutations: Vec<StoredObjectSnapshot>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;
let guard = self
Expand All @@ -433,11 +433,11 @@ impl PgIndexerStore {
.start_timer();
transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
for objects_snapshot_chunk in
objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
for mutation_chunk in
objects_snapshot_mutations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(objects_snapshot::table)
.values(objects_snapshot_chunk)
.values(mutation_chunk)
.on_conflict(objects_snapshot::object_id)
.do_update()
.set((
Expand Down Expand Up @@ -482,6 +482,57 @@ impl PgIndexerStore {
})
}

async fn persist_object_snapshot_deletion_chunk(
&self,
objects_snapshot_deletions: Vec<StoredObjectSnapshot>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;
let guard = self
.metrics
.checkpoint_db_commit_latency_objects_snapshot_chunks
.start_timer();

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
for deletion_chunk in
objects_snapshot_deletions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::delete(
objects_snapshot::table.filter(
objects_snapshot::object_id.eq_any(
deletion_chunk
.iter()
.map(|o| o.object_id.clone())
.collect::<Vec<_>>(),
),
),
)
.execute(conn)
.await
.map_err(IndexerError::from)
.context("Failed to write object deletion to PostgresDB")?;
}
Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(
elapsed,
"Deleted {} chunked object snapshots",
objects_snapshot_deletions.len(),
);
})
.tap_err(|e| {
tracing::error!(
"Failed to persist object snapshot deletions with error: {}",
e
);
})
}

async fn persist_objects_history_chunk(
&self,
stored_objects_history: Vec<StoredHistoryObject>,
Expand Down Expand Up @@ -1678,34 +1729,63 @@ impl IndexerStore for PgIndexerStore {
.checkpoint_db_commit_latency_objects_snapshot
.start_timer();
let (indexed_mutations, indexed_deletions) = retain_latest_indexed_objects(object_changes);
let objects_snapshot = indexed_mutations
let object_snapshot_mutations: Vec<StoredObjectSnapshot> = indexed_mutations
.into_iter()
.map(StoredObjectSnapshot::from)
.chain(
indexed_deletions
.into_iter()
.map(StoredObjectSnapshot::from),
)
.collect();
let object_snapshot_deletions: Vec<StoredObjectSnapshot> = indexed_deletions
.into_iter()
.map(StoredObjectSnapshot::from)
.collect();
let mutation_len = object_snapshot_mutations.len();
let deletion_len = object_snapshot_deletions.len();
let object_snapshot_mutation_chunks = chunk!(
object_snapshot_mutations,
self.config.parallel_objects_chunk_size
);
let object_snapshot_deletion_chunks = chunk!(
object_snapshot_deletions,
self.config.parallel_objects_chunk_size
);
let mutation_futures = object_snapshot_mutation_chunks
.into_iter()
.map(|c| self.persist_object_snapshot_mutation_chunk(c))
.map(Either::Left)
.collect::<Vec<_>>();
let len = objects_snapshot.len();
let chunks = chunk!(objects_snapshot, self.config.parallel_objects_chunk_size);
let futures = chunks
let deletion_futures = object_snapshot_deletion_chunks
.into_iter()
.map(|c| self.persist_objects_snapshot_chunk(c))
.map(|c| self.persist_object_snapshot_deletion_chunk(c))
.map(Either::Right)
.collect::<Vec<_>>();

futures::future::join_all(futures)
let all_futures = mutation_futures
.into_iter()
.chain(deletion_futures.into_iter())
.collect::<Vec<_>>();
futures::future::join_all(all_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed to persist all objects snapshot chunks: {:?}",
"Failed to persist object snapshot mutation or deletion chunks: {:?}",
e
))
})
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(
elapsed,
"Persisted {} objects snapshot mutations and {} deletions",
mutation_len,
deletion_len
);
})
.tap_err(|e| {
tracing::error!(
"Failed to persist object snapshot mutation or deletion chunks: {:?}",
e
)
})?;
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted {} objects snapshot", len);
Ok(())
}

Expand Down

0 comments on commit 9de3f8e

Please sign in to comment.