Skip to content

Commit

Permalink
indexer: exclude deleted / wrapped objects from snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Sep 19, 2024
1 parent 5cf3b93 commit 2ea7e97
Showing 1 changed file with 123 additions and 89 deletions.
212 changes: 123 additions & 89 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use diesel::ExpressionMethods;
use diesel::OptionalExtension;
use diesel::QueryDsl;
use diesel_async::scoped_futures::ScopedFutureExt;
use futures::future::Either;
use itertools::Itertools;
use object_store::path::Path;
use tap::TapFallible;
Expand Down Expand Up @@ -335,7 +336,7 @@ impl PgIndexerStore {
Ok(())
}

async fn persist_object_mutation_chunk(
async fn mutate_objects_chunk(
&self,
mutated_object_mutation_chunk: Vec<StoredObject>,
) -> Result<(), IndexerError> {
Expand Down Expand Up @@ -383,7 +384,7 @@ impl PgIndexerStore {
})
}

async fn persist_object_deletion_chunk(
async fn delete_objects_chunk(
&self,
deleted_objects_chunk: Vec<StoredDeletedObject>,
) -> Result<(), IndexerError> {
Expand Down Expand Up @@ -422,70 +423,91 @@ impl PgIndexerStore {
})
}

async fn persist_objects_snapshot_chunk(
async fn mutate_objects_snapshot_chunk(
&self,
objects: Vec<ObjectChangeToCommit>,
objects_snapshot_chunk: Vec<StoredObjectSnapshot>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;
let guard = self
.metrics
.checkpoint_db_commit_latency_objects_snapshot_chunks
.start_timer();
let mut objects_snapshot: Vec<StoredObjectSnapshot> = vec![];
for object in objects {
match object {
ObjectChangeToCommit::MutatedObject(stored_object) => {
objects_snapshot.push(stored_object.into());
}
ObjectChangeToCommit::DeletedObject(stored_deleted_object) => {
objects_snapshot.push(stored_deleted_object.into());
}
transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(objects_snapshot::table)
.values(objects_snapshot_chunk)
.on_conflict(objects_snapshot::object_id)
.do_update()
.set((
objects_snapshot::object_version
.eq(excluded(objects_snapshot::object_version)),
objects_snapshot::object_status
.eq(excluded(objects_snapshot::object_status)),
objects_snapshot::object_digest
.eq(excluded(objects_snapshot::object_digest)),
objects_snapshot::checkpoint_sequence_number
.eq(excluded(objects_snapshot::checkpoint_sequence_number)),
objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
objects_snapshot::object_type_package
.eq(excluded(objects_snapshot::object_type_package)),
objects_snapshot::object_type_module
.eq(excluded(objects_snapshot::object_type_module)),
objects_snapshot::object_type_name
.eq(excluded(objects_snapshot::object_type_name)),
objects_snapshot::object_type.eq(excluded(objects_snapshot::object_type)),
objects_snapshot::serialized_object
.eq(excluded(objects_snapshot::serialized_object)),
objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
objects_snapshot::coin_balance.eq(excluded(objects_snapshot::coin_balance)),
objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
objects_snapshot::df_name.eq(excluded(objects_snapshot::df_name)),
objects_snapshot::df_object_type
.eq(excluded(objects_snapshot::df_object_type)),
objects_snapshot::df_object_id.eq(excluded(objects_snapshot::df_object_id)),
))
.execute(conn)
.await?;
Ok::<(), IndexerError>(())
}
}
.scope_boxed()
})
.await
.tap_ok(|_| {
guard.stop_and_record();
})
.tap_err(|e| {
tracing::error!("Failed to persist object snapshot with error: {}", e);
})
}

async fn delete_objects_snapshot_chunk(
&self,
deleted_objects_chunk: Vec<StoredObjectSnapshot>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;
let guard = self
.metrics
.checkpoint_db_commit_latency_objects_snapshot_chunks
.start_timer();

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
for objects_snapshot_chunk in
objects_snapshot.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX)
{
diesel::insert_into(objects_snapshot::table)
.values(objects_snapshot_chunk)
.on_conflict(objects_snapshot::object_id)
.do_update()
.set((
objects_snapshot::object_version
.eq(excluded(objects_snapshot::object_version)),
objects_snapshot::object_status
.eq(excluded(objects_snapshot::object_status)),
objects_snapshot::object_digest
.eq(excluded(objects_snapshot::object_digest)),
objects_snapshot::checkpoint_sequence_number
.eq(excluded(objects_snapshot::checkpoint_sequence_number)),
objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)),
objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)),
objects_snapshot::object_type_package
.eq(excluded(objects_snapshot::object_type_package)),
objects_snapshot::object_type_module
.eq(excluded(objects_snapshot::object_type_module)),
objects_snapshot::object_type_name
.eq(excluded(objects_snapshot::object_type_name)),
objects_snapshot::object_type
.eq(excluded(objects_snapshot::object_type)),
objects_snapshot::serialized_object
.eq(excluded(objects_snapshot::serialized_object)),
objects_snapshot::coin_type.eq(excluded(objects_snapshot::coin_type)),
objects_snapshot::coin_balance
.eq(excluded(objects_snapshot::coin_balance)),
objects_snapshot::df_kind.eq(excluded(objects_snapshot::df_kind)),
objects_snapshot::df_name.eq(excluded(objects_snapshot::df_name)),
objects_snapshot::df_object_type
.eq(excluded(objects_snapshot::df_object_type)),
objects_snapshot::df_object_id
.eq(excluded(objects_snapshot::df_object_id)),
))
.execute(conn)
.await?;
}
diesel::delete(
objects_snapshot::table.filter(
objects_snapshot::object_id.eq_any(
deleted_objects_chunk
.iter()
.map(|o| o.object_id.clone())
.collect::<Vec<_>>(),
),
),
)
.execute(conn)
.await
.map_err(IndexerError::from)
.context("Failed to write object deletion to PostgresDB")?;

Ok::<(), IndexerError>(())
}
.scope_boxed()
Expand All @@ -495,7 +517,10 @@ impl PgIndexerStore {
guard.stop_and_record();
})
.tap_err(|e| {
tracing::error!("Failed to persist object snapshot with error: {}", e);
tracing::error!(
"Failed to persist object snapshot deletions with error: {}",
e
);
})
}

Expand Down Expand Up @@ -1562,50 +1587,33 @@ impl IndexerStore for PgIndexerStore {
}
}
}
let mutation_len = object_mutations.len();
let deletion_len = object_deletions.len();

let object_mutation_chunks =
chunk!(object_mutations, self.config.parallel_objects_chunk_size);
let object_deletion_chunks =
chunk!(object_deletions, self.config.parallel_objects_chunk_size);
let mutation_futures = object_mutation_chunks
.into_iter()
.map(|c| self.persist_object_mutation_chunk(c))
.collect::<Vec<_>>();
futures::future::join_all(mutation_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed to persist all object mutation chunks: {:?}",
e
))
})?;
.map(|c| self.mutate_objects_chunk(c))
.map(Either::Left);
let deletion_futures = object_deletion_chunks
.into_iter()
.map(|c| self.persist_object_deletion_chunk(c))
.collect::<Vec<_>>();
futures::future::join_all(deletion_futures)
.map(|c| self.delete_objects_chunk(c))
.map(Either::Right);
let all_futures = mutation_futures.chain(deletion_futures).collect::<Vec<_>>();

futures::future::join_all(all_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed to persist all object deletion chunks: {:?}",
"Failed to persist all object mutation or deletion chunks: {:?}",
e
))
})?;

let elapsed = guard.stop_and_record();
info!(
elapsed,
"Persisted {} objects with {} mutations and {} deletions ",
len,
mutation_len,
deletion_len,
);
info!(elapsed, "Persisted {} objects mutations or deletions", len,);
Ok(())
}

Expand All @@ -1622,19 +1630,45 @@ impl IndexerStore for PgIndexerStore {
.start_timer();
let objects = make_final_list_of_objects_to_commit(object_changes);
let len = objects.len();
let chunks = chunk!(objects, self.config.parallel_objects_chunk_size);
let futures = chunks
.into_iter()
.map(|c| self.persist_objects_snapshot_chunk(c))
.collect::<Vec<_>>();

futures::future::join_all(futures)
let mut object_snapshot_mutations: Vec<StoredObjectSnapshot> = vec![];
let mut object_snapshot_deletions: Vec<StoredObjectSnapshot> = vec![];
for object in objects {
match object {
ObjectChangeToCommit::MutatedObject(mutation) => {
object_snapshot_mutations.push(mutation.into());
}
ObjectChangeToCommit::DeletedObject(deletion) => {
object_snapshot_deletions.push(deletion.into());
}
}
}

let objects_snapshot_mutation_chunks = chunk!(
object_snapshot_mutations,
self.config.parallel_objects_chunk_size
);
let objects_snapshot_deletion_chunks = chunk!(
object_snapshot_deletions,
self.config.parallel_objects_chunk_size
);

let mutation_futures = objects_snapshot_mutation_chunks
.into_iter()
.map(|c| self.mutate_objects_snapshot_chunk(c))
.map(Either::Left);
let deletion_futures = objects_snapshot_deletion_chunks
.into_iter()
.map(|c| self.delete_objects_snapshot_chunk(c))
.map(Either::Right);
let all_futures = mutation_futures.chain(deletion_futures).collect::<Vec<_>>();
futures::future::join_all(all_futures)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed to persist all objects snapshot chunks: {:?}",
"Failed to persist object snapshot mutation or deletion chunks: {:?}",
e
))
})?;
Expand Down

0 comments on commit 2ea7e97

Please sign in to comment.