Skip to content

Commit

Permalink
split out storage component in bridge indexer into its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Sep 6, 2024
1 parent 6110568 commit de51cba
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 178 deletions.
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod metrics;
pub mod models;
pub mod postgres_manager;
pub mod schema;
pub mod storage;
pub mod sui_transaction_handler;
pub mod sui_transaction_queries;
pub mod types;
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use sui_bridge_indexer::config::IndexerConfig;
use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper;
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store};
use sui_bridge_indexer::sui_bridge_indexer::{PgBridgePersistent, SuiBridgeDataMapper};
use sui_bridge_indexer::storage::PgBridgePersistent;
use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper;
use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
Expand Down
180 changes: 180 additions & 0 deletions crates/sui-bridge-indexer/src/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::{anyhow, Error};
use async_trait::async_trait;
use diesel::dsl::now;
use diesel::{ExpressionMethods, TextExpressionMethods};
use diesel::{OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use diesel_async::RunQueryDsl;

use crate::postgres_manager::PgPool;
use crate::schema::progress_store::{columns, dsl};
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
use crate::{models, schema, ProcessedTxnData};
use sui_indexer_builder::indexer_builder::{IndexerProgressStore, Persistent};
use sui_indexer_builder::Task;

/// Persistent layer impl
#[derive(Clone)]
pub struct PgBridgePersistent {
pool: PgPool,
}

impl PgBridgePersistent {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}

// TODO: this is shared between SUI and ETH, move to different file.
#[async_trait]
impl Persistent<ProcessedTxnData> for PgBridgePersistent {
async fn write(&self, data: Vec<ProcessedTxnData>) -> Result<(), Error> {
if data.is_empty() {
return Ok(());
}
let connection = &mut self.pool.get().await?;
connection
.transaction(|conn| {
async move {
for d in data {
match d {
ProcessedTxnData::TokenTransfer(t) => {
diesel::insert_into(token_transfer::table)
.values(&t.to_db())
.on_conflict_do_nothing()
.execute(conn)
.await?;

if let Some(d) = t.to_data_maybe() {
diesel::insert_into(token_transfer_data::table)
.values(&d)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}
}
ProcessedTxnData::Error(e) => {
diesel::insert_into(sui_error_transactions::table)
.values(&e.to_db())
.on_conflict_do_nothing()
.execute(conn)
.await?;
}
}
}
Ok(())
}
.scope_boxed()
})
.await
}
}

#[async_trait]
impl IndexerProgressStore for PgBridgePersistent {
async fn load_progress(&self, task_name: String) -> anyhow::Result<u64> {
let mut conn = self.pool.get().await?;
let cp: Option<models::ProgressStore> = dsl::progress_store
.find(&task_name)
.select(models::ProgressStore::as_select())
.first(&mut conn)
.await
.optional()?;
Ok(cp
.ok_or(anyhow!("Cannot found progress for task {task_name}"))?
.checkpoint as u64)
}

async fn save_progress(
&mut self,
task_name: String,
checkpoint_number: u64,
) -> anyhow::Result<()> {
let mut conn = self.pool.get().await?;
diesel::insert_into(schema::progress_store::table)
.values(&models::ProgressStore {
task_name,
checkpoint: checkpoint_number as i64,
// Target checkpoint and timestamp will only be written for new entries
target_checkpoint: i64::MAX,
// Timestamp is defaulted to current time in DB if None
timestamp: None,
})
.on_conflict(dsl::task_name)
.do_update()
.set((
columns::checkpoint.eq(checkpoint_number as i64),
columns::timestamp.eq(now),
))
.execute(&mut conn)
.await?;
Ok(())
}

async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Vec<Task>, anyhow::Error> {
let mut conn = self.pool.get().await?;
// get all unfinished tasks
let cp: Vec<models::ProgressStore> = dsl::progress_store
// TODO: using like could be error prone, change the progress store schema to stare the task name properly.
.filter(columns::task_name.like(format!("{prefix} - %")))
.filter(columns::checkpoint.lt(columns::target_checkpoint))
.order_by(columns::target_checkpoint.desc())
.load(&mut conn)
.await?;
Ok(cp.into_iter().map(|d| d.into()).collect())
}

async fn get_largest_backfill_task_target_checkpoint(
&self,
prefix: &str,
) -> Result<Option<u64>, Error> {
let mut conn = self.pool.get().await?;
let cp: Option<i64> = dsl::progress_store
.select(columns::target_checkpoint)
// TODO: using like could be error prone, change the progress store schema to stare the task name properly.
.filter(columns::task_name.like(format!("{prefix} - %")))
.filter(columns::target_checkpoint.ne(i64::MAX))
.order_by(columns::target_checkpoint.desc())
.first::<i64>(&mut conn)
.await
.optional()?;
Ok(cp.map(|c| c as u64))
}

async fn register_task(
&mut self,
task_name: String,
checkpoint: u64,
target_checkpoint: u64,
) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await?;
diesel::insert_into(schema::progress_store::table)
.values(models::ProgressStore {
task_name,
checkpoint: checkpoint as i64,
target_checkpoint: target_checkpoint as i64,
// Timestamp is defaulted to current time in DB if None
timestamp: None,
})
.execute(&mut conn)
.await?;
Ok(())
}

async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await?;
diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
.set((
columns::checkpoint.eq(task.checkpoint as i64),
columns::target_checkpoint.eq(task.target_checkpoint as i64),
columns::timestamp.eq(now),
))
.execute(&mut conn)
.await?;
Ok(())
}
}
Loading

0 comments on commit de51cba

Please sign in to comment.