diff --git a/src/db/cache/index_cache.rs b/src/db/cache/index_cache.rs index 2e50dfa..8c5b8a6 100644 --- a/src/db/cache/index_cache.rs +++ b/src/db/cache/index_cache.rs @@ -21,7 +21,7 @@ use crate::{ db_ledger_operation::DbLedgerOperation, db_rune::DbRune, db_supply_change::DbSupplyChange, }, - pg_get_max_rune_number, pg_get_missed_input_rune_balances, pg_get_rune_by_id, + pg_get_input_rune_balances, pg_get_max_rune_number, pg_get_rune_by_id, pg_get_rune_total_mints, }, try_debug, try_info, try_warn, @@ -320,10 +320,10 @@ impl IndexCache { } } - // Look for misses in database. + // Look for cache misses in database. if cache_misses.len() > 0 { // self.db_cache.flush(db_tx, ctx).await; - let output_balances = pg_get_missed_input_rune_balances(cache_misses, db_tx, ctx).await; + let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await; indexed_input_runes.extend(output_balances); } diff --git a/src/db/mod.rs b/src/db/mod.rs index c06454a..fbca077 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, str::FromStr}; +use std::{collections::HashMap, process, str::FromStr}; use cache::transaction_cache::InputRuneBalance; use chainhook_sdk::utils::Context; @@ -45,7 +45,8 @@ pub async fn pg_connect(config: &Config, run_migrations: bool, ctx: &Context) -> Ok((client, connection)) => { tokio::spawn(async move { if let Err(e) = connection.await { - eprintln!("connection error: {}", e); + eprintln!("Postgres connection error: {}", e.to_string()); + process::exit(1); } }); pg_client = client; @@ -67,8 +68,8 @@ pub async fn pg_connect(config: &Config, run_migrations: bool, ctx: &Context) -> { Ok(_) => {} Err(e) => { - try_error!(ctx, "error running pg migrations: {}", e.to_string()); - panic!() + try_error!(ctx, "Error running pg migrations: {}", e.to_string()); + process::exit(1); } }; try_info!(ctx, "Postgres migrations complete"); @@ -143,7 +144,7 @@ pub async fn pg_insert_runes( Ok(_) => {} Err(e) => { try_error!(ctx, "Error inserting runes: {:?}", e); - panic!() + process::exit(1); } }; } @@ -220,7 +221,7 @@ pub async fn pg_insert_supply_changes( Ok(_) => {} Err(e) => { try_error!(ctx, "Error inserting supply changes: {:?}", e); - panic!() + process::exit(1); } }; } @@ -285,7 +286,7 @@ pub async fn pg_insert_balance_changes( Ok(_) => {} Err(e) => { try_error!(ctx, "Error inserting balance changes: {:?}", e); - panic!() + process::exit(1); } }; } @@ -336,7 +337,7 @@ pub async fn pg_insert_ledger_entries( Ok(_) => {} Err(e) => { try_error!(ctx, "Error inserting ledger entries: {:?}", e); - panic!() + process::exit(1); } }; } @@ -411,7 +412,7 @@ pub async fn pg_get_rune_by_id( Ok(row) => row, Err(e) => { try_error!(ctx, "error retrieving rune: {}", e.to_string()); - panic!(); + process::exit(1); } }; let Some(row) = row else { @@ -439,7 +440,7 @@ pub async fn pg_get_rune_total_mints( "error retrieving rune minted total: {}", e.to_string() ); - panic!(); + process::exit(1); } }; let Some(row) = row else { @@ -449,7 +450,10 @@ pub async fn pg_get_rune_total_mints( Some(minted.0) } -pub async fn pg_get_missed_input_rune_balances( +/// Retrieves the rune balance for an array of transaction inputs represented by `(vin, tx_id, vout)` where `vin` is the index of +/// this transaction input, `tx_id` is the transaction ID that produced this input and `vout` is the output index of this previous +/// tx. +pub async fn pg_get_input_rune_balances( outputs: Vec<(u32, String, u32)>, db_tx: &mut Transaction<'_>, ctx: &Context, @@ -500,7 +504,7 @@ pub async fn pg_get_missed_input_rune_balances( "error retrieving output rune balances: {}", e.to_string() ); - panic!(); + process::exit(1); } }; let mut results: HashMap>> = HashMap::new(); diff --git a/src/service.rs b/src/service.rs index 82a68f5..3db43ee 100644 --- a/src/service.rs +++ b/src/service.rs @@ -14,53 +14,52 @@ use chainhook_sdk::{ utils::Context, }; use crossbeam_channel::select; -use tokio_postgres::Client; pub async fn start_service(config: &Config, ctx: &Context) -> Result<(), String> { - let mut pg_client = pg_connect(&config, true, ctx).await; - let mut index_cache = IndexCache::new(config, &mut pg_client, ctx).await; + { + let mut pg_client = pg_connect(&config, true, ctx).await; + let mut index_cache = IndexCache::new(config, &mut pg_client, ctx).await; + loop { + let chain_tip = pg_get_block_height(&mut pg_client, ctx) + .await + .unwrap_or(get_rune_genesis_block_height(config.get_bitcoin_network()) - 1); + let bitcoind_chain_tip = bitcoind_get_block_height(config, ctx); + if bitcoind_chain_tip < chain_tip { + try_info!( + ctx, + "Waiting for bitcoind to reach height {}, currently at {}", + chain_tip, + bitcoind_chain_tip + ); + std::thread::sleep(std::time::Duration::from_secs(10)); + } else if bitcoind_chain_tip > chain_tip { + try_info!( + ctx, + "Block height is behind bitcoind, scanning block range {} to {}", + chain_tip + 1, + bitcoind_chain_tip + ); + scan_blocks( + ((chain_tip + 1)..=bitcoind_chain_tip).collect(), + config, + &mut pg_client, + &mut index_cache, + ctx, + ) + .await?; + } else { + try_info!(ctx, "Caught up to bitcoind chain tip at {}", chain_tip); + break; + } + } + } + // Start chainhook event observer, we're at chain tip. let (observer_cmd_tx, observer_cmd_rx) = channel(); let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded(); let observer_sidecar = set_up_observer_sidecar_runloop(config, ctx) .await .expect("unable to set up observer sidecar"); - - loop { - let chain_tip = pg_get_block_height(&mut pg_client, ctx) - .await - .unwrap_or(get_rune_genesis_block_height(config.get_bitcoin_network()) - 1); - let bitcoind_chain_tip = bitcoind_get_block_height(config, ctx); - if bitcoind_chain_tip < chain_tip { - try_info!( - ctx, - "Waiting for bitcoind to reach height {}, currently at {}", - chain_tip, - bitcoind_chain_tip - ); - std::thread::sleep(std::time::Duration::from_secs(10)); - } else if bitcoind_chain_tip > chain_tip { - try_info!( - ctx, - "Block height is behind bitcoind, scanning block range {} to {}", - chain_tip + 1, - bitcoind_chain_tip - ); - scan_blocks( - ((chain_tip + 1)..=bitcoind_chain_tip).collect(), - config, - &mut pg_client, - &mut index_cache, - ctx, - ) - .await?; - } else { - try_info!(ctx, "Caught up to bitcoind chain tip at {}", chain_tip); - break; - } - } - - // Start chainhook event observer, we're at chain tip. let event_observer_config = config.event_observer.clone(); let context = if config.event_observer.display_logs { ctx.clone() @@ -121,14 +120,13 @@ pub async fn set_up_observer_sidecar_runloop( let _ = hiro_system_kit::thread_named("Observer Sidecar Runloop").spawn(move || { hiro_system_kit::nestable_block_on(async { - let mut pg_client = pg_connect(&config, false, &ctx).await; - let mut index_cache = IndexCache::new(&config, &mut pg_client, &ctx).await; + let mut index_cache = + IndexCache::new(&config, &mut pg_connect(&config, false, &ctx).await, &ctx).await; loop { select! { recv(block_mutator_in_rx) -> msg => { if let Ok((mut blocks_to_mutate, blocks_ids_to_rollback)) = msg { chainhook_sidecar_mutate_blocks( - &mut pg_client, &mut index_cache, &mut blocks_to_mutate, &blocks_ids_to_rollback, @@ -152,20 +150,20 @@ pub async fn set_up_observer_sidecar_runloop( } pub async fn chainhook_sidecar_mutate_blocks( - pg_client: &mut Client, index_cache: &mut IndexCache, blocks_to_mutate: &mut Vec, block_ids_to_rollback: &Vec, - _config: &Config, + config: &Config, ctx: &Context, ) { try_info!(ctx, "Received mutate blocks message from Chainhook SDK"); + let mut pg_client = pg_connect(&config, false, &ctx).await; for block_id in block_ids_to_rollback.iter() { - roll_back_block(pg_client, block_id.index, ctx).await; + roll_back_block(&mut pg_client, block_id.index, ctx).await; } for cache in blocks_to_mutate.iter_mut() { if !cache.processed_by_sidecar { - index_block(pg_client, index_cache, &mut cache.block, ctx).await; + index_block(&mut pg_client, index_cache, &mut cache.block, ctx).await; cache.processed_by_sidecar = true; } }