Skip to content

Commit

Permalink
integrate ProgressSavingPolicy
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Sep 6, 2024
1 parent 36be154 commit 411ec7b
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 82 deletions.
8 changes: 0 additions & 8 deletions crates/sui-bridge-indexer/src/eth_bridge_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,6 @@ impl Datasource<RawEthData> for EthSubscriptionDatasource {
fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

pub struct EthSyncDatasource {
Expand Down Expand Up @@ -288,10 +284,6 @@ impl Datasource<RawEthData> for EthSyncDatasource {
fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

#[derive(Clone)]
Expand Down
24 changes: 20 additions & 4 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use sui_bridge_indexer::config::IndexerConfig;
use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper;
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store};
use sui_bridge_indexer::storage::PgBridgePersistent;
use sui_bridge_indexer::storage::{
OutOfOrderSaveAfterDurationPolicy, PgBridgePersistent, ProgressSavingPolicy,
SaveAfterDurationPolicy,
};
use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper;
use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
Expand Down Expand Up @@ -72,7 +75,20 @@ async fn main() -> Result<()> {
let bridge_metrics = Arc::new(BridgeMetrics::new(&registry));

let db_url = config.db_url.clone();
let datastore = PgBridgePersistent::new(get_connection_pool(db_url.clone()).await);
let datastore = PgBridgePersistent::new(
get_connection_pool(db_url.clone()).await,
ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
indexer_meterics.clone(),
);
let datastore_with_out_of_order_source = PgBridgePersistent::new(
get_connection_pool(db_url.clone()).await,
ProgressSavingPolicy::OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
indexer_meterics.clone(),
);

let eth_client: Arc<EthClient<MeteredEthHttpProvier>> = Arc::new(
EthClient::<MeteredEthHttpProvier>::new(
Expand Down Expand Up @@ -119,7 +135,7 @@ async fn main() -> Result<()> {
EthDataMapper {
metrics: indexer_meterics.clone(),
},
datastore.clone(),
datastore,
)
.with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
.disable_live_task()
Expand All @@ -146,7 +162,7 @@ async fn main() -> Result<()> {
SuiBridgeDataMapper {
metrics: indexer_meterics.clone(),
},
datastore,
datastore_with_out_of_order_source,
)
.build();
indexer.start().await?;
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-bridge-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct BridgeIndexerMetrics {
pub(crate) last_synced_eth_block: IntGauge,
pub(crate) tasks_remaining_checkpoints: IntGaugeVec,
pub(crate) tasks_processed_checkpoints: IntCounterVec,
pub(crate) live_task_current_checkpoint: IntGaugeVec,
pub(crate) tasks_current_checkpoints: IntGaugeVec,
}

impl BridgeIndexerMetrics {
Expand Down Expand Up @@ -115,9 +115,9 @@ impl BridgeIndexerMetrics {
registry,
)
.unwrap(),
live_task_current_checkpoint: register_int_gauge_vec_with_registry!(
"bridge_indexer_live_task_current_checkpoint",
"Current checkpoint of live task",
tasks_current_checkpoints: register_int_gauge_vec_with_registry!(
"bridge_indexer_tasks_current_checkpoints",
"Current checkpoint for each task",
&["task_name"],
registry,
)
Expand Down
74 changes: 51 additions & 23 deletions crates/sui-bridge-indexer/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use diesel_async::RunQueryDsl;

use crate::metrics::BridgeIndexerMetrics;
use crate::postgres_manager::PgPool;
use crate::schema::progress_store::{columns, dsl};
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
Expand All @@ -26,11 +27,21 @@ use sui_indexer_builder::Task;
#[derive(Clone)]
pub struct PgBridgePersistent {
pool: PgPool,
save_progress_policy: ProgressSavingPolicy,
indexer_metrics: BridgeIndexerMetrics,
}

impl PgBridgePersistent {
pub fn new(pool: PgPool) -> Self {
Self { pool }
pub fn new(
pool: PgPool,
save_progress_policy: ProgressSavingPolicy,
indexer_metrics: BridgeIndexerMetrics,
) -> Self {
Self {
pool,
save_progress_policy,
indexer_metrics,
}
}
}

Expand Down Expand Up @@ -96,27 +107,44 @@ impl IndexerProgressStore for PgBridgePersistent {
async fn save_progress(
&mut self,
task_name: String,
checkpoint_number: u64,
) -> anyhow::Result<()> {
let mut conn = self.pool.get().await?;
diesel::insert_into(schema::progress_store::table)
.values(&models::ProgressStore {
task_name,
checkpoint: checkpoint_number as i64,
// Target checkpoint and timestamp will only be written for new entries
target_checkpoint: i64::MAX,
// Timestamp is defaulted to current time in DB if None
timestamp: None,
})
.on_conflict(dsl::task_name)
.do_update()
.set((
columns::checkpoint.eq(checkpoint_number as i64),
columns::timestamp.eq(now),
))
.execute(&mut conn)
.await?;
Ok(())
checkpoint_numbers: &[u64],
start_checkpoint_number: u64,
target_checkpoint_number: u64,
) -> anyhow::Result<Option<u64>> {
if checkpoint_numbers.is_empty() {
return Ok(None);
}
if let Some(checkpoint_to_save) = self.save_progress_policy.cache_progress(
task_name.clone(),
checkpoint_numbers,
start_checkpoint_number,
target_checkpoint_number,
) {
let mut conn = self.pool.get().await?;
diesel::insert_into(schema::progress_store::table)
.values(&models::ProgressStore {
task_name: task_name.clone(),
checkpoint: checkpoint_to_save as i64,
// Target checkpoint and timestamp will only be written for new entries
target_checkpoint: i64::MAX,
// Timestamp is defaulted to current time in DB if None
timestamp: None,
})
.on_conflict(dsl::task_name)
.do_update()
.set((
columns::checkpoint.eq(checkpoint_to_save as i64),
columns::timestamp.eq(now),
))
.execute(&mut conn)
.await?;
self.indexer_metrics
.tasks_current_checkpoints
.with_label_values(&[&task_name])
.set(checkpoint_to_save as i64);
return Ok(Some(checkpoint_to_save));
}
Ok(None)
}

async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Vec<Task>, anyhow::Error> {
Expand Down
14 changes: 9 additions & 5 deletions crates/sui-bridge-indexer/src/sui_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

struct PerTaskInMemProgressStore {
Expand All @@ -134,11 +130,19 @@ impl ProgressStore for PerTaskInMemProgressStore {

async fn save(
&mut self,
_task_name: String,
task_name: String,
checkpoint_number: CheckpointSequenceNumber,
) -> anyhow::Result<()> {
if checkpoint_number >= self.exit_checkpoint {
tracing::info!(
task_name,
checkpoint_number,
exit_checkpoint = self.exit_checkpoint,
"Task completed, sending exit signal"
);
// `exit_sender` may be `None` if we have already sent the exit signal.
if let Some(sender) = self.exit_sender.take() {
// Ignore the error if the receiver has already been dropped.
let _ = sender.send(());
}
}
Expand Down
Loading

0 comments on commit 411ec7b

Please sign in to comment.