Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: storage optimizations #206

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
9855b77
chore: cargo fmt
Nov 2, 2023
f9e2ac4
fix: making db access more defensive
Nov 2, 2023
b101de5
chore: clippy
Nov 2, 2023
d1b5d68
fix: reopen conn on each iteration
Nov 3, 2023
319d956
fix: retry mechanism for statement prepare
Nov 3, 2023
6704a2b
refactor: sqlite requests retries
Nov 3, 2023
fa05163
feat: set explicit timeout value
Nov 3, 2023
4eed91a
fix: make logging optional
Nov 4, 2023
f5f1c33
chore: logger cleaning
Nov 4, 2023
4b5e9c4
fix: result looping
Nov 7, 2023
67a8943
feat: introduce ORDHOOK_MAINTENANCE mode, revisit cli options
Nov 14, 2023
c619fec
chore: re-qualify logs
Nov 14, 2023
f70fdba
chore: enable mode code site
Nov 14, 2023
04e13b9
chore: re-qualify log
Nov 14, 2023
76e6ed1
fix: keep trying opening conn
Nov 14, 2023
d65cc22
fix: improve rocksdb resiliency with retries
Nov 16, 2023
ff285e7
fix: baseline experiment
Nov 16, 2023
b39472f
chore: update chainhook-sdk
Nov 16, 2023
78cee50
fix: testnet support
Nov 17, 2023
6f5bd3d
Squashed commit of the following:
Nov 27, 2023
13faa1c
chore: update Cargo.lock
Nov 27, 2023
ea5cba6
chore: cargo fmt
Nov 27, 2023
8464d0d
Merge branch 'develop' into fix/database-optims
Nov 27, 2023
308b7f2
fix: merge artifact
Nov 27, 2023
8d21f39
Merge branch 'develop' into fix/database-optims
Nov 28, 2023
38bead9
fix: rocksdb fine tuning
Nov 28, 2023
c4bad08
fix: Cargo.toml warns
Nov 28, 2023
4410933
feat: auto-repair at boot
Nov 28, 2023
568a6c8
fix: rocksdb conn handling
Nov 28, 2023
638d3f7
fix: improve backoff strategy
Nov 28, 2023
413251e
Merge branch 'fix/database-optims' of github.com:hirosystems/hord int…
Nov 28, 2023
91dfc02
chore: refactor BlockBytesCursor usage
Nov 28, 2023
e1741a8
fix: update sequence_metadata when augmenting block
Nov 29, 2023
9b1a905
feat: revisit observers handling
Nov 30, 2023
5924097
fix: aborted loop
Nov 30, 2023
2584bfb
Merge branch 'fix/database-optims' of github.com:hirosystems/hord int…
Nov 30, 2023
ef6b060
fix: stateful observers
Dec 1, 2023
2d78c8c
fix: build warnings
Dec 1, 2023
0b476b5
chore: tweak db config
Dec 13, 2023
cc4f89b
fix: include coinbase output offset
Dec 13, 2023
ddbe675
chore: tweak cli
Dec 13, 2023
bdd1dda
fix: iterator usage
Dec 13, 2023
7520b42
chore: disable blobdb
Dec 14, 2023
3015e28
fix: broken test
Dec 15, 2023
4430922
Merge pull request #230 from hirosystems/fix/coinbase-outputs
Dec 18, 2023
a2823de
Merge branch 'develop' into fix/database-optims
Dec 18, 2023
e1415ec
chore: update dependencies
Dec 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
351 changes: 205 additions & 146 deletions Cargo.lock

Large diffs are not rendered by default.

32 changes: 15 additions & 17 deletions components/ordhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ use ordhook::core::protocol::inscription_parsing::parse_inscriptions_and_standar
use ordhook::core::protocol::satoshi_numbering::compute_satoshi_number;
use ordhook::db::{
delete_data_in_ordhook_db, find_all_inscription_transfers, find_all_inscriptions_in_block,
find_all_transfers_in_block, find_inscription_with_id, find_last_block_inserted,
find_latest_inscription_block_height, find_lazy_block_at_block_height,
find_all_transfers_in_block, find_block_bytes_at_block_height, find_inscription_with_id,
find_last_block_inserted, find_latest_inscription_block_height, find_missing_blocks,
get_default_ordhook_db_file_path, initialize_ordhook_db, open_ordhook_db_conn_rocks_db_loop,
open_readonly_ordhook_db_conn, open_readonly_ordhook_db_conn_rocks_db,
open_readwrite_ordhook_db_conn,
open_readwrite_ordhook_db_conn, BlockBytesCursor,
};
use ordhook::download::download_ordinals_dataset_if_required;
use ordhook::hex;
use ordhook::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use ordhook::service::{start_observer_forwarding, Service};
use reqwest::Client as HttpClient;
Expand Down Expand Up @@ -668,15 +669,20 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
.await?;
let transaction_identifier = TransactionIdentifier::new(&cmd.transaction_id);
let cache = new_traversals_lazy_cache(100);
let res = compute_satoshi_number(
let (res, mut back_trace) = compute_satoshi_number(
&config.get_ordhook_config().db_path,
&block.block_identifier,
&transaction_identifier,
0,
0,
&Arc::new(cache),
true,
ctx,
)?;
back_trace.reverse();
for (block_height, tx) in back_trace.iter() {
println!("{}\t{}", block_height, hex::encode(tx));
}
println!("{:?}", res);
}
Command::Service(subcmd) => match subcmd {
Expand Down Expand Up @@ -793,9 +799,10 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
ctx,
);
for i in cmd.get_blocks().into_iter() {
let block =
find_lazy_block_at_block_height(i as u32, 10, false, &blocks_db, ctx)
let block_bytes =
find_block_bytes_at_block_height(i as u32, 10, &blocks_db, ctx)
.expect("unable to retrieve block {i}");
let block = BlockBytesCursor::new(&block_bytes);
info!(ctx.expect_logger(), "--------------------");
info!(ctx.expect_logger(), "Block: {i}");
for tx in block.iter_tx() {
Expand Down Expand Up @@ -861,18 +868,9 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
{
let blocks_db =
open_readonly_ordhook_db_conn_rocks_db(&config.expected_cache_path(), ctx)?;
let tip = find_last_block_inserted(&blocks_db) as u64;
let tip = find_last_block_inserted(&blocks_db);
println!("Tip: {}", tip);

let mut missing_blocks = vec![];
for i in cmd.start_block..=cmd.end_block {
if find_lazy_block_at_block_height(i as u32, 0, false, &blocks_db, ctx)
.is_none()
{
println!("Missing block #{i}");
missing_blocks.push(i);
}
}
let missing_blocks = find_missing_blocks(&blocks_db, 1, tip, ctx);
println!("{:?}", missing_blocks);
}
}
Expand Down
1 change: 1 addition & 0 deletions components/ordhook-cli/src/config/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl ConfigFile {
"devnet" => (StacksNetwork::Devnet, BitcoinNetwork::Regtest),
"testnet" => (StacksNetwork::Testnet, BitcoinNetwork::Testnet),
"mainnet" => (StacksNetwork::Mainnet, BitcoinNetwork::Mainnet),
"signet" => (StacksNetwork::Testnet, BitcoinNetwork::Signet),
_ => return Err("network.mode not supported".to_string()),
};

Expand Down
3 changes: 2 additions & 1 deletion components/ordhook-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ serde_json = "1"
serde_derive = "1"
hex = "0.4.3"
rand = "0.8.5"
chainhook-sdk = { version = "0.11.0", features = ["zeromq"] }
chainhook-sdk = { version = "=0.11.0", features = ["zeromq"] }
# chainhook-sdk = { version = "=0.10.1", path = "../../../chainhook/components/chainhook-sdk", default-features = false, features = ["zeromq", "log"] }
hiro-system-kit = "0.3.1"
reqwest = { version = "0.11", default-features = false, features = ["stream", "json", "rustls-tls"] }
Expand All @@ -35,6 +35,7 @@ progressing = '3'
futures = "0.3.28"
rocksdb = { version = "0.21.0", default-features = false, features = ["snappy"] }
pprof = { version = "0.13.0", features = ["flamegraph"], optional = true }
hyper = { version = "=0.14.27" }

# [profile.release]
# debug = true
Expand Down
10 changes: 5 additions & 5 deletions components/ordhook-core/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ use chainhook_sdk::{

use crate::{
config::{Config, LogConfig},
db::{find_lazy_block_at_block_height, open_ordhook_db_conn_rocks_db_loop},
db::{find_pinned_block_bytes_at_block_height, open_ordhook_db_conn_rocks_db_loop},
};

use crate::db::{
find_last_block_inserted, find_latest_inscription_block_height, initialize_ordhook_db,
open_readonly_ordhook_db_conn,
};

use crate::db::LazyBlockTransaction;
use crate::db::TransactionBytesCursor;

#[derive(Clone, Debug)]
pub struct OrdhookConfig {
Expand All @@ -44,11 +44,11 @@ pub fn new_traversals_cache(

pub fn new_traversals_lazy_cache(
cache_size: usize,
) -> DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>> {
) -> DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>> {
let hasher = FxBuildHasher::default();
DashMap::with_capacity_and_hasher(
((cache_size.saturating_sub(500)) * 1000 * 1000)
.div(LazyBlockTransaction::get_average_bytes_size()),
.div(TransactionBytesCursor::get_average_bytes_size()),
hasher,
)
}
Expand Down Expand Up @@ -139,7 +139,7 @@ pub fn should_sync_ordhook_db(

match find_latest_inscription_block_height(&inscriptions_db_conn, ctx)? {
Some(height) => {
if find_lazy_block_at_block_height(height as u32, 3, false, &blocks_db, &ctx).is_none()
if find_pinned_block_bytes_at_block_height(height as u32, 3, &blocks_db, &ctx).is_none()
{
start_block = start_block.min(height);
} else {
Expand Down
12 changes: 6 additions & 6 deletions components/ordhook-core/src/core/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::time::Duration;
use tokio::task::JoinSet;

use crate::config::Config;
use crate::db::LazyBlock;
use crate::db::BlockBytesCursor;

use chainhook_sdk::indexer::bitcoin::{
build_http_client, parse_downloaded_block, try_download_block_bytes_with_retry,
Expand All @@ -19,7 +19,7 @@ use chainhook_sdk::indexer::bitcoin::{
use super::protocol::inscription_parsing::parse_inscriptions_and_standardize_block;

pub enum PostProcessorCommand {
ProcessBlocks(Vec<(u64, LazyBlock)>, Vec<BitcoinBlockData>),
ProcessBlocks(Vec<(u64, Vec<u8>)>, Vec<BitcoinBlockData>),
Terminate,
}

Expand Down Expand Up @@ -111,7 +111,7 @@ pub async fn download_and_pipeline_blocks(
while let Ok(Some(block_bytes)) = rx.recv() {
let raw_block_data =
parse_downloaded_block(block_bytes).expect("unable to parse block");
let compressed_block = LazyBlock::from_full_block(&raw_block_data)
let compressed_block = BlockBytesCursor::from_full_block(&raw_block_data)
.expect("unable to compress block");
let block_height = raw_block_data.height as u64;
let block_data = if block_height >= start_sequencing_blocks_at_height {
Expand Down Expand Up @@ -177,7 +177,7 @@ pub async fn download_and_pipeline_blocks(
}
}
None => {
stop_runloop = true;
break;
}
}
}
Expand All @@ -195,9 +195,9 @@ pub async fn download_and_pipeline_blocks(
let mut ooo_compacted_blocks = vec![];
for (block_height, block_opt, compacted_block) in new_blocks.into_iter() {
if let Some(block) = block_opt {
inbox.insert(block_height, (block, compacted_block));
inbox.insert(block_height, (block, compacted_block.to_vec()));
} else {
ooo_compacted_blocks.push((block_height, compacted_block));
ooo_compacted_blocks.push((block_height, compacted_block.to_vec()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop, LazyBlock},
db::{insert_entry_in_blocks, open_ordhook_db_conn_rocks_db_loop},
};

pub fn start_block_archiving_processor(
Expand Down Expand Up @@ -72,7 +72,7 @@ pub fn start_block_archiving_processor(
}

pub fn store_compacted_blocks(
mut compacted_blocks: Vec<(u64, LazyBlock)>,
mut compacted_blocks: Vec<(u64, Vec<u8>)>,
update_tip: bool,
blocks_db_rw: &DB,
ctx: &Context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
},
};

use crate::db::{LazyBlockTransaction, TraversalResult};
use crate::db::{TransactionBytesCursor, TraversalResult};

use crate::{
config::Config,
Expand Down Expand Up @@ -162,7 +162,7 @@ pub fn start_inscription_indexing_processor(
pub fn process_blocks(
next_blocks: &mut Vec<BitcoinBlockData>,
sequence_cursor: &mut SequenceCursor,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
inscriptions_db_conn_rw: &mut Connection,
ordhook_config: &OrdhookConfig,
post_processor: &Option<Sender<BitcoinBlockData>>,
Expand Down Expand Up @@ -259,7 +259,7 @@ pub fn process_block(
next_blocks: &Vec<BitcoinBlockData>,
sequence_cursor: &mut SequenceCursor,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
inscriptions_db_tx: &Transaction,
ordhook_config: &OrdhookConfig,
ctx: &Context,
Expand Down
39 changes: 20 additions & 19 deletions components/ordhook-core/src/core/protocol/inscription_sequencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
find_blessed_inscription_with_ordinal_number,
find_latest_cursed_inscription_number_at_block_height,
find_latest_inscription_number_at_block_height, format_satpoint_to_watch,
update_inscriptions_with_block, update_sequence_metadata_with_block, LazyBlockTransaction,
TraversalResult,
update_inscriptions_with_block, update_sequence_metadata_with_block,
TransactionBytesCursor, TraversalResult,
},
ord::height::Height,
};
Expand Down Expand Up @@ -68,12 +68,18 @@ pub fn parallelize_inscription_data_computations(
block: &BitcoinBlockData,
next_blocks: &Vec<BitcoinBlockData>,
cache_l1: &mut BTreeMap<(TransactionIdentifier, usize), TraversalResult>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), LazyBlockTransaction, BuildHasherDefault<FxHasher>>>,
cache_l2: &Arc<DashMap<(u32, [u8; 8]), TransactionBytesCursor, BuildHasherDefault<FxHasher>>>,
inscriptions_db_tx: &Transaction,
ordhook_config: &OrdhookConfig,
ctx: &Context,
) -> Result<bool, String> {
ctx.try_log(|logger| {
let inner_ctx = if ordhook_config.logs.ordinals_internals {
ctx.clone()
} else {
Context::empty()
};

inner_ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} started", block.block_identifier.index
Expand All @@ -83,12 +89,6 @@ pub fn parallelize_inscription_data_computations(
let (mut transactions_ids, l1_cache_hits) =
get_transactions_to_process(block, cache_l1, inscriptions_db_tx, ctx);

let inner_ctx = if ordhook_config.logs.ordinals_internals {
ctx.clone()
} else {
Context::empty()
};

let has_transactions_to_process = !transactions_ids.is_empty() || !l1_cache_hits.is_empty();

let thread_max = ordhook_config.ingestion_thread_max;
Expand Down Expand Up @@ -118,13 +118,14 @@ pub fn parallelize_inscription_data_computations(
while let Ok(Some((transaction_id, block_identifier, input_index, prioritary))) =
rx.recv()
{
let traversal: Result<TraversalResult, String> = compute_satoshi_number(
let traversal: Result<(TraversalResult, _), String> = compute_satoshi_number(
&moved_ordhook_db_path,
&block_identifier,
&transaction_id,
input_index,
0,
&local_cache,
false,
&moved_ctx,
);
let _ = moved_traversal_tx.send((traversal, prioritary, thread_index));
Expand All @@ -138,7 +139,7 @@ pub fn parallelize_inscription_data_computations(
let mut thread_index = 0;
for key in l1_cache_hits.iter() {
if let Some(entry) = cache_l1.remove(key) {
let _ = traversal_tx.send((Ok(entry), true, thread_index));
let _ = traversal_tx.send((Ok((entry, vec![])), true, thread_index));
thread_index = (thread_index + 1) % thread_max;
}
}
Expand All @@ -148,7 +149,7 @@ pub fn parallelize_inscription_data_computations(
.map(|b| format!("{}", b.block_identifier.index))
.collect::<Vec<_>>();

ctx.try_log(|logger| {
inner_ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to process: {} (L1 cache hits: {}, queue: [{}], L1 cache len: {}, L2 cache len: {})",
Expand Down Expand Up @@ -190,7 +191,7 @@ pub fn parallelize_inscription_data_computations(
traversals_received += 1;
}
match traversal_result {
Ok(traversal) => {
Ok((traversal, _)) => {
inner_ctx.try_log(|logger| {
info!(
logger,
Expand Down Expand Up @@ -231,7 +232,7 @@ pub fn parallelize_inscription_data_computations(
let (mut transactions_ids, _) =
get_transactions_to_process(next_block, cache_l1, inscriptions_db_tx, ctx);

ctx.try_log(|logger| {
inner_ctx.try_log(|logger| {
info!(
logger,
"Number of inscriptions in block #{} to pre-process: {}",
Expand All @@ -254,7 +255,7 @@ pub fn parallelize_inscription_data_computations(
}
}
}
ctx.try_log(|logger| {
inner_ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} collected", block.block_identifier.index
Expand All @@ -265,7 +266,7 @@ pub fn parallelize_inscription_data_computations(
for tx in tx_thread_pool.iter() {
// Empty the queue
if let Ok((traversal_result, _prioritary, thread_index)) = traversal_rx.try_recv() {
if let Ok(traversal) = traversal_result {
if let Ok((traversal, _)) = traversal_result {
inner_ctx.try_log(|logger| {
info!(
logger,
Expand All @@ -289,7 +290,7 @@ pub fn parallelize_inscription_data_computations(
let _ = tx.send(None);
}

let ctx_moved = ctx.clone();
let ctx_moved = inner_ctx.clone();
let _ = hiro_system_kit::thread_named("Garbage collection").spawn(move || {
ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation started",));

Expand All @@ -299,7 +300,7 @@ pub fn parallelize_inscription_data_computations(
ctx_moved.try_log(|logger| info!(logger, "Cleanup: threadpool deallocation ended",));
});

ctx.try_log(|logger| {
inner_ctx.try_log(|logger| {
info!(
logger,
"Inscriptions data computation for block #{} ended", block.block_identifier.index
Expand Down
Loading
Loading