Skip to content

Commit

Permalink
fix: open and close pg connections for each chainhook message (#15)
Browse files Browse the repository at this point in the history
* fix: open and close pg connections for each chainhook message

* chore: flush db cache on input miss

* fix: revert
  • Loading branch information
rafaelcr authored Jul 5, 2024
1 parent e797fc3 commit 3c93671
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 61 deletions.
6 changes: 3 additions & 3 deletions src/db/cache/index_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
db_ledger_operation::DbLedgerOperation, db_rune::DbRune,
db_supply_change::DbSupplyChange,
},
pg_get_max_rune_number, pg_get_missed_input_rune_balances, pg_get_rune_by_id,
pg_get_input_rune_balances, pg_get_max_rune_number, pg_get_rune_by_id,
pg_get_rune_total_mints,
},
try_debug, try_info, try_warn,
Expand Down Expand Up @@ -320,10 +320,10 @@ impl IndexCache {
}
}

// Look for misses in database.
// Look for cache misses in database.
if cache_misses.len() > 0 {
// self.db_cache.flush(db_tx, ctx).await;
let output_balances = pg_get_missed_input_rune_balances(cache_misses, db_tx, ctx).await;
let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await;
indexed_input_runes.extend(output_balances);
}

Expand Down
28 changes: 16 additions & 12 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, str::FromStr};
use std::{collections::HashMap, process, str::FromStr};

use cache::transaction_cache::InputRuneBalance;
use chainhook_sdk::utils::Context;
Expand Down Expand Up @@ -45,7 +45,8 @@ pub async fn pg_connect(config: &Config, run_migrations: bool, ctx: &Context) ->
Ok((client, connection)) => {
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
eprintln!("Postgres connection error: {}", e.to_string());
process::exit(1);
}
});
pg_client = client;
Expand All @@ -67,8 +68,8 @@ pub async fn pg_connect(config: &Config, run_migrations: bool, ctx: &Context) ->
{
Ok(_) => {}
Err(e) => {
try_error!(ctx, "error running pg migrations: {}", e.to_string());
panic!()
try_error!(ctx, "Error running pg migrations: {}", e.to_string());
process::exit(1);
}
};
try_info!(ctx, "Postgres migrations complete");
Expand Down Expand Up @@ -143,7 +144,7 @@ pub async fn pg_insert_runes(
Ok(_) => {}
Err(e) => {
try_error!(ctx, "Error inserting runes: {:?}", e);
panic!()
process::exit(1);
}
};
}
Expand Down Expand Up @@ -220,7 +221,7 @@ pub async fn pg_insert_supply_changes(
Ok(_) => {}
Err(e) => {
try_error!(ctx, "Error inserting supply changes: {:?}", e);
panic!()
process::exit(1);
}
};
}
Expand Down Expand Up @@ -285,7 +286,7 @@ pub async fn pg_insert_balance_changes(
Ok(_) => {}
Err(e) => {
try_error!(ctx, "Error inserting balance changes: {:?}", e);
panic!()
process::exit(1);
}
};
}
Expand Down Expand Up @@ -336,7 +337,7 @@ pub async fn pg_insert_ledger_entries(
Ok(_) => {}
Err(e) => {
try_error!(ctx, "Error inserting ledger entries: {:?}", e);
panic!()
process::exit(1);
}
};
}
Expand Down Expand Up @@ -411,7 +412,7 @@ pub async fn pg_get_rune_by_id(
Ok(row) => row,
Err(e) => {
try_error!(ctx, "error retrieving rune: {}", e.to_string());
panic!();
process::exit(1);
}
};
let Some(row) = row else {
Expand Down Expand Up @@ -439,7 +440,7 @@ pub async fn pg_get_rune_total_mints(
"error retrieving rune minted total: {}",
e.to_string()
);
panic!();
process::exit(1);
}
};
let Some(row) = row else {
Expand All @@ -449,7 +450,10 @@ pub async fn pg_get_rune_total_mints(
Some(minted.0)
}

pub async fn pg_get_missed_input_rune_balances(
/// Retrieves the rune balance for an array of transaction inputs represented by `(vin, tx_id, vout)` where `vin` is the index of
/// this transaction input, `tx_id` is the transaction ID that produced this input and `vout` is the output index of this previous
/// tx.
pub async fn pg_get_input_rune_balances(
outputs: Vec<(u32, String, u32)>,
db_tx: &mut Transaction<'_>,
ctx: &Context,
Expand Down Expand Up @@ -500,7 +504,7 @@ pub async fn pg_get_missed_input_rune_balances(
"error retrieving output rune balances: {}",
e.to_string()
);
panic!();
process::exit(1);
}
};
let mut results: HashMap<u32, HashMap<RuneId, Vec<InputRuneBalance>>> = HashMap::new();
Expand Down
90 changes: 44 additions & 46 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,52 @@ use chainhook_sdk::{
utils::Context,
};
use crossbeam_channel::select;
use tokio_postgres::Client;

pub async fn start_service(config: &Config, ctx: &Context) -> Result<(), String> {
let mut pg_client = pg_connect(&config, true, ctx).await;
let mut index_cache = IndexCache::new(config, &mut pg_client, ctx).await;
{
let mut pg_client = pg_connect(&config, true, ctx).await;
let mut index_cache = IndexCache::new(config, &mut pg_client, ctx).await;
loop {
let chain_tip = pg_get_block_height(&mut pg_client, ctx)
.await
.unwrap_or(get_rune_genesis_block_height(config.get_bitcoin_network()) - 1);
let bitcoind_chain_tip = bitcoind_get_block_height(config, ctx);
if bitcoind_chain_tip < chain_tip {
try_info!(
ctx,
"Waiting for bitcoind to reach height {}, currently at {}",
chain_tip,
bitcoind_chain_tip
);
std::thread::sleep(std::time::Duration::from_secs(10));
} else if bitcoind_chain_tip > chain_tip {
try_info!(
ctx,
"Block height is behind bitcoind, scanning block range {} to {}",
chain_tip + 1,
bitcoind_chain_tip
);
scan_blocks(
((chain_tip + 1)..=bitcoind_chain_tip).collect(),
config,
&mut pg_client,
&mut index_cache,
ctx,
)
.await?;
} else {
try_info!(ctx, "Caught up to bitcoind chain tip at {}", chain_tip);
break;
}
}
}

// Start chainhook event observer, we're at chain tip.
let (observer_cmd_tx, observer_cmd_rx) = channel();
let (observer_event_tx, observer_event_rx) = crossbeam_channel::unbounded();
let observer_sidecar = set_up_observer_sidecar_runloop(config, ctx)
.await
.expect("unable to set up observer sidecar");

loop {
let chain_tip = pg_get_block_height(&mut pg_client, ctx)
.await
.unwrap_or(get_rune_genesis_block_height(config.get_bitcoin_network()) - 1);
let bitcoind_chain_tip = bitcoind_get_block_height(config, ctx);
if bitcoind_chain_tip < chain_tip {
try_info!(
ctx,
"Waiting for bitcoind to reach height {}, currently at {}",
chain_tip,
bitcoind_chain_tip
);
std::thread::sleep(std::time::Duration::from_secs(10));
} else if bitcoind_chain_tip > chain_tip {
try_info!(
ctx,
"Block height is behind bitcoind, scanning block range {} to {}",
chain_tip + 1,
bitcoind_chain_tip
);
scan_blocks(
((chain_tip + 1)..=bitcoind_chain_tip).collect(),
config,
&mut pg_client,
&mut index_cache,
ctx,
)
.await?;
} else {
try_info!(ctx, "Caught up to bitcoind chain tip at {}", chain_tip);
break;
}
}

// Start chainhook event observer, we're at chain tip.
let event_observer_config = config.event_observer.clone();
let context = if config.event_observer.display_logs {
ctx.clone()
Expand Down Expand Up @@ -121,14 +120,13 @@ pub async fn set_up_observer_sidecar_runloop(

let _ = hiro_system_kit::thread_named("Observer Sidecar Runloop").spawn(move || {
hiro_system_kit::nestable_block_on(async {
let mut pg_client = pg_connect(&config, false, &ctx).await;
let mut index_cache = IndexCache::new(&config, &mut pg_client, &ctx).await;
let mut index_cache =
IndexCache::new(&config, &mut pg_connect(&config, false, &ctx).await, &ctx).await;
loop {
select! {
recv(block_mutator_in_rx) -> msg => {
if let Ok((mut blocks_to_mutate, blocks_ids_to_rollback)) = msg {
chainhook_sidecar_mutate_blocks(
&mut pg_client,
&mut index_cache,
&mut blocks_to_mutate,
&blocks_ids_to_rollback,
Expand All @@ -152,20 +150,20 @@ pub async fn set_up_observer_sidecar_runloop(
}

pub async fn chainhook_sidecar_mutate_blocks(
pg_client: &mut Client,
index_cache: &mut IndexCache,
blocks_to_mutate: &mut Vec<BitcoinBlockDataCached>,
block_ids_to_rollback: &Vec<BlockIdentifier>,
_config: &Config,
config: &Config,
ctx: &Context,
) {
try_info!(ctx, "Received mutate blocks message from Chainhook SDK");
let mut pg_client = pg_connect(&config, false, &ctx).await;
for block_id in block_ids_to_rollback.iter() {
roll_back_block(pg_client, block_id.index, ctx).await;
roll_back_block(&mut pg_client, block_id.index, ctx).await;
}
for cache in blocks_to_mutate.iter_mut() {
if !cache.processed_by_sidecar {
index_block(pg_client, index_cache, &mut cache.block, ctx).await;
index_block(&mut pg_client, index_cache, &mut cache.block, ctx).await;
cache.processed_by_sidecar = true;
}
}
Expand Down

0 comments on commit 3c93671

Please sign in to comment.