Skip to content

Commit

Permalink
feat: improve queue management
Browse files Browse the repository at this point in the history
  • Loading branch information
lgalabru committed Jan 12, 2024
1 parent 5292875 commit fd73567
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 14 deletions.
11 changes: 8 additions & 3 deletions components/ordhook-cli/src/config/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use ordhook::chainhook_sdk::types::{
};
use ordhook::config::{
Config, LogConfig, PredicatesApi, PredicatesApiConfig, ResourcesConfig, SnapshotConfig,
StorageConfig, DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE, DEFAULT_CONTROL_PORT,
DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
StorageConfig, DEFAULT_BITCOIND_RPC_THREADS, DEFAULT_BITCOIND_RPC_TIMEOUT,
DEFAULT_CONTROL_PORT, DEFAULT_MEMORY_AVAILABLE, DEFAULT_ULIMIT,
};
use std::fs::File;
use std::io::{BufReader, Read};
Expand Down Expand Up @@ -85,7 +85,11 @@ impl ConfigFile {
bitcoind_rpc_threads: config_file
.resources
.bitcoind_rpc_threads
.unwrap_or(DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE),
.unwrap_or(DEFAULT_BITCOIND_RPC_THREADS),
bitcoind_rpc_timeout: config_file
.resources
.bitcoind_rpc_timeout
.unwrap_or(DEFAULT_BITCOIND_RPC_TIMEOUT),
expected_observers_count: config_file
.resources
.expected_observers_count
Expand Down Expand Up @@ -170,6 +174,7 @@ pub struct ResourcesConfigFile {
pub cpu_core_available: Option<usize>,
pub memory_available: Option<usize>,
pub bitcoind_rpc_threads: Option<usize>,
pub bitcoind_rpc_timeout: Option<u32>,
pub expected_observers_count: Option<usize>,
}

Expand Down
3 changes: 2 additions & 1 deletion components/ordhook-cli/src/config/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ bitcoind_zmq_url = "tcp://0.0.0.0:18543"
ulimit = 2048
cpu_core_available = 16
memory_available = 32
bitcoind_rpc_threads = 8
bitcoind_rpc_threads = 4
bitcoind_rpc_timeout = 15
expected_observers_count = 1
# Disable the following section if the state
Expand Down
13 changes: 9 additions & 4 deletions components/ordhook-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ pub const DEFAULT_INGESTION_PORT: u16 = 20455;
pub const DEFAULT_CONTROL_PORT: u16 = 20456;
pub const DEFAULT_ULIMIT: usize = 2048;
pub const DEFAULT_MEMORY_AVAILABLE: usize = 8;
pub const DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE: usize = 4;
pub const DEFAULT_BITCOIND_RPC_THREADS: usize = 4;
pub const DEFAULT_BITCOIND_RPC_TIMEOUT: u32 = 15;

#[derive(Clone, Debug)]
pub struct Config {
Expand Down Expand Up @@ -70,6 +71,7 @@ pub struct ResourcesConfig {
pub cpu_core_available: usize,
pub memory_available: usize,
pub bitcoind_rpc_threads: usize,
pub bitcoind_rpc_timeout: u32,
pub expected_observers_count: usize,
}

Expand Down Expand Up @@ -167,7 +169,8 @@ impl Config {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {
Expand Down Expand Up @@ -198,7 +201,8 @@ impl Config {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {
Expand Down Expand Up @@ -229,7 +233,8 @@ impl Config {
cpu_core_available: num_cpus::get(),
memory_available: DEFAULT_MEMORY_AVAILABLE,
ulimit: DEFAULT_ULIMIT,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS_AVAILABLE,
bitcoind_rpc_threads: DEFAULT_BITCOIND_RPC_THREADS,
bitcoind_rpc_timeout: DEFAULT_BITCOIND_RPC_TIMEOUT,
expected_observers_count: 1,
},
network: IndexerConfig {
Expand Down
21 changes: 15 additions & 6 deletions components/ordhook-core/src/core/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ pub async fn download_and_pipeline_blocks(
// For each worker in that pool, we want to bound the size of the queue to avoid OOM
// Blocks size can range from 1 to 4Mb (when packed with witness data).
// Start blocking networking when each worker has a backlog of 8 blocks seems reasonable.
let worker_queue_size = 8;
let worker_queue_size = 2;

for _ in 0..thread_pool_network_response_processing_capacity {
for _ in 0..ordhook_config.resources.bitcoind_rpc_threads {
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = moved_ctx.clone();
Expand Down Expand Up @@ -262,9 +262,20 @@ pub async fn download_and_pipeline_blocks(

let mut round_robin_worker_thread_index = 0;
while let Some(res) = set.join_next().await {
let block = res.unwrap().unwrap();
let block = res
.expect("unable to retrieve block")
.expect("unable to deserialize block");

loop {
let res = tx_thread_pool[round_robin_worker_thread_index].send(Some(block.clone()));
round_robin_worker_thread_index = (round_robin_worker_thread_index + 1)
% thread_pool_network_response_processing_capacity;
if res.is_ok() {
break;
}
sleep(Duration::from_millis(500));
}

let _ = tx_thread_pool[round_robin_worker_thread_index].send(Some(block));
if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
let ctx = ctx.clone();
Expand All @@ -276,8 +287,6 @@ pub async fn download_and_pipeline_blocks(
ctx,
));
}
round_robin_worker_thread_index = (round_robin_worker_thread_index + 1)
% thread_pool_network_response_processing_capacity;
}

ctx.try_log(|logger| {
Expand Down

0 comments on commit fd73567

Please sign in to comment.