Skip to content

Commit

Permalink
fix: improve rocksdb resiliency with retries
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Nov 16, 2023
1 parent 76e6ed1 commit d65cc22
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 31 deletions.
36 changes: 25 additions & 11 deletions components/ordhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ordhook::chainhook_sdk::chainhooks::types::{
ChainhookFullSpecification, HookAction, OrdinalOperations,
};
use ordhook::chainhook_sdk::indexer::bitcoin::{
download_and_parse_block_with_retry, retrieve_block_hash_with_retry, build_http_client,
build_http_client, download_and_parse_block_with_retry, retrieve_block_hash_with_retry,
};
use ordhook::chainhook_sdk::observer::BitcoinConfig;
use ordhook::chainhook_sdk::types::{BitcoinBlockData, TransactionIdentifier};
Expand All @@ -27,8 +27,8 @@ use ordhook::db::{
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
get_default_ordhook_db_file_path, initialize_ordhook_db, open_readonly_ordhook_db_conn,
open_readonly_ordhook_db_conn_rocks_db, open_readwrite_ordhook_db_conn,
open_readwrite_ordhook_db_conn_rocks_db,
open_readonly_ordhook_db_conn_rocks_db, open_readwrite_ordhook_db_conn, open_ordhook_db_conn_rocks_db_loop,

};
use ordhook::download::download_ordinals_dataset_if_required;
use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
Expand Down Expand Up @@ -76,7 +76,6 @@ enum ScanCommand {
/// Retrieve activities for a given inscription
#[clap(name = "transaction", bin_name = "transaction")]
Transaction(ScanTransactionCommand),

}

#[derive(Parser, PartialEq, Clone, Debug)]
Expand Down Expand Up @@ -660,20 +659,35 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let config: Config =
ConfigFile::default(cmd.regtest, cmd.testnet, cmd.mainnet, &cmd.config_path)?;
let http_client = build_http_client();
let block = fetch_and_standardize_block(&http_client, cmd.block_height, &config.get_event_observer_config().get_bitcoin_config(), ctx).await?;
let block = fetch_and_standardize_block(
&http_client,
cmd.block_height,
&config.get_event_observer_config().get_bitcoin_config(),
ctx,
)
.await?;
let transaction_identifier = TransactionIdentifier::new(&cmd.transaction_id);
let cache = new_traversals_lazy_cache(100);
let res = compute_satoshi_number(&config.get_ordhook_config().db_path, &block.block_identifier, &transaction_identifier, 0, 0, &Arc::new(cache), ctx)?;
let res = compute_satoshi_number(
&config.get_ordhook_config().db_path,
&block.block_identifier,
&transaction_identifier,
0,
0,
&Arc::new(cache),
ctx,
)?;
println!("{:?}", res);
}
Command::Service(subcmd) => match subcmd {
ServiceCommand::Start(cmd) => {
let maintenance_enabled = std::env::var("ORDHOOK_MAINTENANCE").unwrap_or("0".into());
let maintenance_enabled =
std::env::var("ORDHOOK_MAINTENANCE").unwrap_or("0".into());
if maintenance_enabled.eq("1") {
info!(ctx.expect_logger(), "Entering maintenance mode (default duration = 7 days). Unset ORDHOOK_MAINTENANCE and reboot to resume operations");
sleep(Duration::from_secs(3600 * 24 * 7))
}

let config =
ConfigFile::default(cmd.regtest, cmd.testnet, cmd.mainnet, &cmd.config_path)?;

Expand All @@ -685,7 +699,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let last_known_block =
find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?;
if last_known_block.is_none() {
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
}

let ordhook_config = config.get_ordhook_config();
Expand Down Expand Up @@ -742,7 +756,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::New(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
initialize_ordhook_db(&config.expected_cache_path(), ctx);
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
}
Command::Db(OrdhookDbCommand::Sync(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
Expand Down Expand Up @@ -848,7 +862,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
Command::Db(OrdhookDbCommand::Drop(cmd)) => {
let config = ConfigFile::default(false, false, false, &cmd.config_path)?;
let blocks_db =
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), ctx);
let inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&config.expected_cache_path(), ctx)?;

Expand Down
6 changes: 3 additions & 3 deletions components/ordhook-core/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use chainhook_sdk::{

use crate::{
config::{Config, LogConfig},
db::{find_lazy_block_at_block_height, open_readwrite_ordhook_db_conn_rocks_db},
db::{find_lazy_block_at_block_height, open_ordhook_db_conn_rocks_db_loop},
};

use crate::db::{
Expand Down Expand Up @@ -95,7 +95,7 @@ pub fn compute_next_satpoint_data(
}

pub fn should_sync_rocks_db(config: &Config, ctx: &Context) -> Result<Option<(u64, u64)>, String> {
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let inscriptions_db_conn = open_readonly_ordhook_db_conn(&config.expected_cache_path(), &ctx)?;
let last_compressed_block = find_last_block_inserted(&blocks_db) as u64;
let last_indexed_block = match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)?
Expand Down Expand Up @@ -128,7 +128,7 @@ pub fn should_sync_ordhook_db(
}
};

let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let blocks_db = open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let mut start_block = find_last_block_inserted(&blocks_db) as u64;

if start_block == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, open_readwrite_ordhook_db_conn_rocks_db, LazyBlock},
db::{insert_entry_in_blocks, LazyBlock, open_ordhook_db_conn_rocks_db_loop},
};

pub fn start_block_archiving_processor(
Expand All @@ -26,8 +26,7 @@ pub fn start_block_archiving_processor(
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let blocks_db_rw =
open_readwrite_ordhook_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
.unwrap();
open_ordhook_db_conn_rocks_db_loop(true, &config.expected_cache_path(), &ctx);
let mut processed_blocks = 0;

loop {
Expand Down
30 changes: 24 additions & 6 deletions components/ordhook-core/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::BTreeMap,
io::{Read, Write},
path::PathBuf,
path::PathBuf, thread::sleep, time::Duration,
};

use rand::{thread_rng, Rng};
Expand Down Expand Up @@ -277,7 +277,7 @@ pub fn open_readwrite_ordhook_dbs(
Ok((blocks_db, inscriptions_db))
}

pub fn open_readwrite_ordhook_db_conn_rocks_db(
fn open_readwrite_ordhook_db_conn_rocks_db(
base_dir: &PathBuf,
_ctx: &Context,
) -> Result<DB, String> {
Expand All @@ -293,12 +293,30 @@ pub fn insert_entry_in_blocks(
lazy_block: &LazyBlock,
update_tip: bool,
blocks_db_rw: &DB,
_ctx: &Context,
ctx: &Context,
) {
let block_height_bytes = block_height.to_be_bytes();
blocks_db_rw
.put(&block_height_bytes, &lazy_block.bytes)
.expect("unable to insert blocks");
let mut retries = 0;
loop {
let res = blocks_db_rw.put(&block_height_bytes, &lazy_block.bytes);
match res {
Ok(_) => break,
Err(e) => {
retries += 1;
if retries > 10 {
ctx.try_log(|logger| {
error!(
logger,
"unable to insert block {block_height} ({}). will retry in 5 secs",
e.to_string()
);
});
sleep(Duration::from_secs(5));
}
}
}
}

if update_tip {
blocks_db_rw
.put(b"metadata::last_insert", block_height_bytes)
Expand Down
7 changes: 4 additions & 3 deletions components/ordhook-core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::core::protocol::inscription_sequencing::SequenceCursor;
use crate::core::{new_traversals_lazy_cache, should_sync_ordhook_db, should_sync_rocks_db};
use crate::db::{
delete_data_in_ordhook_db, insert_entry_in_blocks, open_readwrite_ordhook_db_conn,
open_readwrite_ordhook_db_conn_rocks_db, open_readwrite_ordhook_dbs,
open_ordhook_db_conn_rocks_db_loop, open_readwrite_ordhook_dbs,
update_inscriptions_with_block, update_locations_with_block, LazyBlock, LazyBlockTransaction,
};
use crate::scan::bitcoin::process_block_with_predicates;
Expand Down Expand Up @@ -442,10 +442,11 @@ impl Service {
event_observer_config: &EventObserverConfig,
) -> Result<(), String> {
if rebuild_from_scratch {
let blocks_db = open_readwrite_ordhook_db_conn_rocks_db(
let blocks_db = open_ordhook_db_conn_rocks_db_loop(
true,
&self.config.expected_cache_path(),
&self.ctx,
)?;
);
let inscriptions_db_conn_rw =
open_readwrite_ordhook_db_conn(&self.config.expected_cache_path(), &self.ctx)?;

Expand Down
5 changes: 0 additions & 5 deletions components/ordhook-sdk-js/src/ordinals_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ impl OrdinalsIndexingRunloop {
tracer: false,
};

// Initialize service
// {
// let _ = initialize_ordhook_db(&ordhook_config.expected_cache_path(), &ctx);
// let _ = open_readwrite_ordhook_db_conn_rocks_db(&ordhook_config.expected_cache_path(), &ctx);
// }
let mut service: Service = Service::new(ordhook_config, ctx);

// Set-up the observer sidecar - used for augmenting the bitcoin blocks with
Expand Down

0 comments on commit d65cc22

Please sign in to comment.