Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mempool syncing optimizations & fixups #89

Merged
merged 7 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal.clone(),
Expand Down Expand Up @@ -81,14 +82,11 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
Arc::clone(&config),
)));
loop {
match Mempool::update(&mempool, &daemon) {
Ok(_) => break,
Err(e) => {
warn!("Error performing initial mempool update, trying again in 5 seconds: {}", e.display_chain());
signal.wait(Duration::from_secs(5), false)?;
},
}

while !Mempool::update(&mempool, &daemon, &tip)? {
// Mempool syncing was aborted because the chain tip moved;
// Index the new block(s) and try again.
tip = indexer.update(&daemon)?;
}

#[cfg(feature = "liquid")]
Expand Down Expand Up @@ -117,7 +115,6 @@ fn run_server(config: Arc<Config>) -> Result<()> {
));

loop {

main_loop_count.inc();

if let Err(err) = signal.wait(Duration::from_secs(5), true) {
Expand All @@ -130,14 +127,12 @@ fn run_server(config: Arc<Config>) -> Result<()> {
// Index new blocks
let current_tip = daemon.getbestblockhash()?;
if current_tip != tip {
indexer.update(&daemon)?;
tip = current_tip;
tip = indexer.update(&daemon)?;
};

// Update mempool
if let Err(e) = Mempool::update(&mempool, &daemon) {
// Log the error if the result is an Err
warn!("Error updating mempool, skipping mempool update: {}", e.display_chain());
if !Mempool::update(&mempool, &daemon, &tip)? {
warn!("skipped failed mempool update, trying again in 5 seconds");
}

// Update subscribed clients
Expand Down
1 change: 1 addition & 0 deletions src/bin/tx-fingerprint-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn main() {
&config.daemon_dir,
&config.blocks_dir,
config.daemon_rpc_addr,
config.daemon_parallelism,
config.cookie_getter(),
config.network_type,
signal,
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Config {
pub daemon_dir: PathBuf,
pub blocks_dir: PathBuf,
pub daemon_rpc_addr: SocketAddr,
pub daemon_parallelism: usize,
pub cookie: Option<String>,
pub electrum_rpc_addr: SocketAddr,
pub http_addr: SocketAddr,
Expand Down Expand Up @@ -132,6 +133,12 @@ impl Config {
.help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)")
.takes_value(true),
)
.arg(
Arg::with_name("daemon_parallelism")
.long("daemon-parallelism")
.help("Number of JSONRPC requests to send in parallel")
.default_value("4")
)
.arg(
Arg::with_name("monitoring_addr")
.long("monitoring-addr")
Expand Down Expand Up @@ -386,6 +393,7 @@ impl Config {
daemon_dir,
blocks_dir,
daemon_rpc_addr,
daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize),
cookie,
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
electrum_rpc_addr,
Expand Down
133 changes: 88 additions & 45 deletions src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cell::OnceCell;
use std::collections::{HashMap, HashSet};
use std::env;
use std::io::{BufRead, BufReader, Lines, Write};
Expand All @@ -8,8 +9,9 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use base64::prelude::{Engine, BASE64_STANDARD};
use error_chain::ChainedError;
use hex::FromHex;
use itertools::Itertools;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use serde_json::{from_str, from_value, Value};

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -79,13 +81,13 @@ fn parse_error_code(err: &Value) -> Option<i64> {

fn parse_jsonrpc_reply(mut reply: Value, method: &str, expected_id: u64) -> Result<Value> {
if let Some(reply_obj) = reply.as_object_mut() {
if let Some(err) = reply_obj.get("error") {
if let Some(err) = reply_obj.get_mut("error") {
if !err.is_null() {
if let Some(code) = parse_error_code(&err) {
match code {
// RPC_IN_WARMUP -> retry by later reconnection
-28 => bail!(ErrorKind::Connection(err.to_string())),
_ => bail!("{} RPC error: {}", method, err),
code => bail!(ErrorKind::RpcError(code, err.take(), method.to_string())),
}
}
}
Expand Down Expand Up @@ -248,7 +250,7 @@ impl Connection {
Ok(if status == "HTTP/1.1 200 OK" {
contents
} else if status == "HTTP/1.1 500 Internal Server Error" {
warn!("HTTP status: {}", status);
debug!("RPC HTTP 500 error: {}", contents);
contents // the contents should have a JSONRPC error field
} else {
bail!(
Expand Down Expand Up @@ -287,6 +289,8 @@ pub struct Daemon {
message_id: Counter, // for monotonic JSONRPC 'id'
signal: Waiter,

rpc_threads: Arc<rayon::ThreadPool>,

// monitoring
latency: HistogramVec,
size: HistogramVec,
Expand All @@ -297,6 +301,7 @@ impl Daemon {
daemon_dir: &PathBuf,
blocks_dir: &PathBuf,
daemon_rpc_addr: SocketAddr,
daemon_parallelism: usize,
cookie_getter: Arc<dyn CookieGetter>,
network: Network,
signal: Waiter,
Expand All @@ -313,6 +318,13 @@ impl Daemon {
)?),
message_id: Counter::new(),
signal: signal.clone(),
rpc_threads: Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(daemon_parallelism)
.thread_name(|i| format!("rpc-requests-{}", i))
.build()
.unwrap(),
),
latency: metrics.histogram_vec(
HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"),
&["method"],
Expand Down Expand Up @@ -361,6 +373,7 @@ impl Daemon {
conn: Mutex::new(self.conn.lock().unwrap().reconnect()?),
message_id: Counter::new(),
signal: self.signal.clone(),
rpc_threads: self.rpc_threads.clone(),
latency: self.latency.clone(),
size: self.size.clone(),
})
Expand Down Expand Up @@ -398,33 +411,18 @@ impl Daemon {
Ok(result)
}

fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn handle_request(&self, method: &str, params: &Value) -> Result<Value> {
let id = self.message_id.next();
let chunks = params_list
.iter()
.map(|params| json!({"method": method, "params": params, "id": id}))
.chunks(50_000); // Max Amount of batched requests
let mut results = vec![];
for chunk in &chunks {
let reqs = chunk.collect();
let mut replies = self.call_jsonrpc(method, &reqs)?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
results.push(parse_jsonrpc_reply(reply.take(), method, id)?)
}
} else {
bail!("non-array replies: {:?}", replies);
}
}

Ok(results)
let req = json!({"method": method, "params": params, "id": id});
let reply = self.call_jsonrpc(method, &req)?;
parse_jsonrpc_reply(reply, method, id)
}

fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
fn retry_request(&self, method: &str, params: &Value) -> Result<Value> {
loop {
match self.handle_request_batch(method, params_list) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("reconnecting to bitcoind: {}", msg);
match self.handle_request(method, &params) {
Err(e @ Error(ErrorKind::Connection(_), _)) => {
warn!("reconnecting to bitcoind: {}", e.display_chain());
self.signal.wait(Duration::from_secs(3), false)?;
let mut conn = self.conn.lock().unwrap();
*conn = conn.reconnect()?;
Expand All @@ -436,13 +434,47 @@ impl Daemon {
}

fn request(&self, method: &str, params: Value) -> Result<Value> {
let mut values = self.retry_request_batch(method, &[params])?;
assert_eq!(values.len(), 1);
Ok(values.remove(0))
self.retry_request(method, &params)
}

fn requests(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
self.retry_request_batch(method, params_list)
fn retry_reconnect(&self) -> Daemon {
// XXX add a max reconnection attempts limit?
philippem marked this conversation as resolved.
Show resolved Hide resolved
loop {
match self.reconnect() {
Ok(daemon) => break daemon,
Err(e) => {
warn!("failed connecting to RPC daemon: {}", e.display_chain());
}
}
}
}

// Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching),
// buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned.
fn requests(&self, method: &str, params_list: Vec<Value>) -> Result<Vec<Value>> {
self.requests_iter(method, params_list).collect()
}

// Send requests in parallel over multiple RPC connections, iterating over the results without buffering them.
// Errors are included in the iterator and do not terminate other pending requests.
fn requests_iter<'a>(
&'a self,
method: &'a str,
params_list: Vec<Value>,
) -> impl ParallelIterator<Item = Result<Value>> + IndexedParallelIterator + 'a {
self.rpc_threads.install(move || {
params_list.into_par_iter().map(move |params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will
// get initialized as necessary for the `rpc_threads` pool thread managed by rayon.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(&method, &params)
})
})
})
}

// bitcoind JSONRPC API:
Expand All @@ -468,12 +500,12 @@ impl Daemon {
pub fn getblockheaders(&self, heights: &[usize]) -> Result<Vec<BlockHeader>> {
let heights: Vec<Value> = heights.iter().map(|height| json!([height])).collect();
let params_list: Vec<Value> = self
.requests("getblockhash", &heights)?
.requests("getblockhash", heights)?
.into_iter()
.map(|hash| json!([hash, /*verbose=*/ false]))
.collect();
let mut result = vec![];
for h in self.requests("getblockheader", &params_list)? {
for h in self.requests("getblockheader", params_list)? {
result.push(header_from_value(h)?);
}
Ok(result)
Expand All @@ -495,27 +527,38 @@ impl Daemon {
.iter()
.map(|hash| json!([hash, /*verbose=*/ false]))
.collect();
let values = self.requests("getblock", &params_list)?;
let values = self.requests("getblock", params_list)?;
let mut blocks = vec![];
for value in values {
blocks.push(block_from_value(value)?);
}
Ok(blocks)
}

pub fn gettransactions(&self, txhashes: &[&Txid]) -> Result<Vec<Transaction>> {
let params_list: Vec<Value> = txhashes
/// Fetch the given transactions in parallel over multiple threads and RPC connections,
/// ignoring any missing ones and returning whatever is available.
pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result<Vec<(Txid, Transaction)>> {
const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5;

let params_list: Vec<Value> = txids
.iter()
.map(|txhash| json!([txhash, /*verbose=*/ false]))
.collect();

let values = self.requests("getrawtransaction", &params_list)?;
let mut txs = vec![];
for value in values {
txs.push(tx_from_value(value)?);
}
assert_eq!(txhashes.len(), txs.len());
Ok(txs)
self.requests_iter("getrawtransaction", params_list)
.zip(txids)
.filter_map(|(res, txid)| match res {
Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))),
// Ignore 'tx not found' errors
Err(Error(ErrorKind::RpcError(code, _, _), _))
if code == RPC_INVALID_ADDRESS_OR_KEY =>
{
None
}
// Terminate iteration if any other errors are encountered
Err(e) => Some(Err(e)),
})
.collect()
}

pub fn gettransaction_raw(
Expand Down Expand Up @@ -556,7 +599,7 @@ impl Daemon {
let params_list: Vec<Value> = conf_targets.iter().map(|t| json!([t, "ECONOMICAL"])).collect();

Ok(self
.requests("estimatesmartfee", &params_list)?
.requests("estimatesmartfee", params_list)?
.iter()
.zip(conf_targets)
.filter_map(|(reply, target)| {
Expand Down
5 changes: 5 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ error_chain! {
display("Connection error: {}", msg)
}

RpcError(code: i64, error: serde_json::Value, method: String) {
description("RPC error")
display("{} RPC error {}: {}", method, code, error)
}

Interrupt(sig: i32) {
description("Interruption by external signal")
display("Iterrupted by signal {}", sig)
Expand Down
Loading
Loading