From 5b073575103a463ed41c254be3c4e8fac1a4cfb8 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Thu, 23 May 2024 21:30:15 +0300 Subject: [PATCH 1/7] Don't use RPC batching with bitcoind This actually hurts performance because the batched response has to be bueffered on the bitcoind side, as @TheBlueMatt explains at https://github.com/romanz/electrs/issues/373#issuecomment-785533444 Instead, send multiple individual RPC requests in parallel using a thread pool, with a separate RPC TCP connection for each thread. Also see https://github.com/romanz/electrs/pull/374 --- src/bin/electrs.rs | 1 + src/bin/tx-fingerprint-stats.rs | 1 + src/config.rs | 8 ++++ src/daemon.rs | 82 ++++++++++++++++++++------------- tests/common.rs | 2 + 5 files changed, 62 insertions(+), 32 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index fb25e68a8..ee92d7e0d 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -49,6 +49,7 @@ fn run_server(config: Arc) -> Result<()> { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal.clone(), diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index afe980f8c..94a3821ab 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -33,6 +33,7 @@ fn main() { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal, diff --git a/src/config.rs b/src/config.rs index 8696ecf8f..9cf07d89b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,6 +25,7 @@ pub struct Config { pub daemon_dir: PathBuf, pub blocks_dir: PathBuf, pub daemon_rpc_addr: SocketAddr, + pub daemon_parallelism: usize, pub cookie: Option, pub electrum_rpc_addr: SocketAddr, pub http_addr: SocketAddr, @@ -132,6 +133,12 @@ impl Config { .help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)") .takes_value(true), ) + .arg( + Arg::with_name("daemon_parallelism") + .long("daemon-parallelism") + .help("Number of JSONRPC requests to send in parallel") + .default_value("4") + ) .arg( Arg::with_name("monitoring_addr") .long("monitoring-addr") @@ -386,6 +393,7 @@ impl Config { daemon_dir, blocks_dir, daemon_rpc_addr, + daemon_parallelism: value_t_or_exit!(m, "daemon_parallelism", usize), cookie, utxos_limit: value_t_or_exit!(m, "utxos_limit", usize), electrum_rpc_addr, diff --git a/src/daemon.rs b/src/daemon.rs index 457bf4230..cd4f81cd6 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,3 +1,4 @@ +use std::cell::OnceCell; use std::collections::{HashMap, HashSet}; use std::env; use std::io::{BufRead, BufReader, Lines, Write}; @@ -8,8 +9,9 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; +use error_chain::ChainedError; use hex::FromHex; -use itertools::Itertools; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -281,6 +283,7 @@ impl Counter { pub struct Daemon { daemon_dir: PathBuf, + daemon_parallelism: usize, blocks_dir: PathBuf, network: Network, conn: Mutex, @@ -297,6 +300,7 @@ impl Daemon { daemon_dir: &PathBuf, blocks_dir: &PathBuf, daemon_rpc_addr: SocketAddr, + daemon_parallelism: usize, cookie_getter: Arc, network: Network, signal: Waiter, @@ -304,6 +308,7 @@ impl Daemon { ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), + daemon_parallelism, blocks_dir: blocks_dir.clone(), network, conn: Mutex::new(Connection::new( @@ -356,6 +361,7 @@ impl Daemon { pub fn reconnect(&self) -> Result { Ok(Daemon { daemon_dir: self.daemon_dir.clone(), + daemon_parallelism: self.daemon_parallelism, blocks_dir: self.blocks_dir.clone(), network: self.network, conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), @@ -398,31 +404,16 @@ impl Daemon { Ok(result) } - fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn handle_request(&self, method: &str, params: &Value) -> Result { let id = self.message_id.next(); - let chunks = params_list - .iter() - .map(|params| json!({"method": method, "params": params, "id": id})) - .chunks(50_000); // Max Amount of batched requests - let mut results = vec![]; - for chunk in &chunks { - let reqs = chunk.collect(); - let mut replies = self.call_jsonrpc(method, &reqs)?; - if let Some(replies_vec) = replies.as_array_mut() { - for reply in replies_vec { - results.push(parse_jsonrpc_reply(reply.take(), method, id)?) - } - } else { - bail!("non-array replies: {:?}", replies); - } - } - - Ok(results) + let req = json!({"method": method, "params": params, "id": id}); + let reply = self.call_jsonrpc(method, &req)?; + parse_jsonrpc_reply(reply, method, id) } - fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { + fn retry_request(&self, method: &str, params: &Value) -> Result { loop { - match self.handle_request_batch(method, params_list) { + match self.handle_request(method, ¶ms) { Err(Error(ErrorKind::Connection(msg), _)) => { warn!("reconnecting to bitcoind: {}", msg); self.signal.wait(Duration::from_secs(3), false)?; @@ -436,13 +427,45 @@ impl Daemon { } fn request(&self, method: &str, params: Value) -> Result { - let mut values = self.retry_request_batch(method, &[params])?; - assert_eq!(values.len(), 1); - Ok(values.remove(0)) + self.retry_request(method, ¶ms) } + fn retry_reconnect(&self) -> Daemon { + // XXX add a max reconnection attempts limit? + loop { + match self.reconnect() { + Ok(daemon) => break daemon, + Err(e) => { + warn!("failed connecting to RPC daemon: {}", e.display_chain()); + } + } + } + } + + // Send requests in parallel over multiple connections as individual JSON-RPC requests (with no JSON-RPC batching) fn requests(&self, method: &str, params_list: &[Value]) -> Result> { - self.retry_request_batch(method, params_list) + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(self.daemon_parallelism) + .thread_name(|i| format!("rpc-requests-{}", i)) + .build() + .unwrap(); + + thread_pool.install(|| { + params_list + .par_iter() + .map(|params| { + // Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as + // necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped. + thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); + + DAEMON_INSTANCE.with(|daemon| { + daemon + .get_or_init(|| self.retry_reconnect()) + .retry_request(method, params) + }) + }) + .collect() + }) } // bitcoind JSONRPC API: @@ -510,12 +533,7 @@ impl Daemon { .collect(); let values = self.requests("getrawtransaction", ¶ms_list)?; - let mut txs = vec![]; - for value in values { - txs.push(tx_from_value(value)?); - } - assert_eq!(txhashes.len(), txs.len()); - Ok(txs) + values.into_iter().map(tx_from_value).collect() } pub fn gettransaction_raw( diff --git a/tests/common.rs b/tests/common.rs index c85ebf7d9..f08f850cc 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -89,6 +89,7 @@ impl TestRunner { network_type, db_path: electrsdb.path().to_path_buf(), daemon_dir: daemon_subdir.clone(), + daemon_parallelism: 3, blocks_dir: daemon_subdir.join("blocks"), daemon_rpc_addr: params.rpc_socket.into(), cookie: None, @@ -127,6 +128,7 @@ impl TestRunner { &config.daemon_dir, &config.blocks_dir, config.daemon_rpc_addr, + config.daemon_parallelism, config.cookie_getter(), config.network_type, signal.clone(), From 5c8c785406c111a53c0ff340ea7a8e40b4ae99e9 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Thu, 30 May 2024 17:33:58 +0300 Subject: [PATCH 2/7] Don't store mempool txs before all prevouts are available The indexing process was adding transactions into the store so that prevouts funded & spent within the same batch could be looked up via Mempool::lookup_txos(). If the indexing process later failed for any reason, these transactions would remain in the store. With this change, we instead explicitly look for prevouts funded within the same batch, then look for the rest in the chain/mempool indexes and fail if any are missing, without keeping the transactions in the store. --- src/new_index/mempool.rs | 81 ++++++++++++++++++++-------------------- src/new_index/query.rs | 3 +- src/util/mod.rs | 4 +- src/util/transaction.rs | 13 ++++++- 4 files changed, 57 insertions(+), 44 deletions(-) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 94ba7a41d..770c328da 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -21,7 +21,7 @@ use crate::new_index::{ SpendingInfo, SpendingInput, TxHistoryInfo, Utxo, }; use crate::util::fees::{make_fee_histogram, TxFeeInfo}; -use crate::util::{extract_tx_prevouts, full_hash, has_prevout, is_spendable, Bytes}; +use crate::util::{extract_tx_prevouts, full_hash, get_prev_outpoints, is_spendable, Bytes}; #[cfg(feature = "liquid")] use crate::elements::asset; @@ -288,40 +288,57 @@ impl Mempool { self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now()); } - pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) { + pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { if self.txstore.get(txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { self.add(vec![tx]) + } else { + bail!("add_by_txid cannot find {}", txid); } + } else { + Ok(()) } } - fn add(&mut self, txs: Vec) { + fn add(&mut self, txs: Vec) -> Result<()> { self.delta .with_label_values(&["add"]) .observe(txs.len() as f64); let _timer = self.latency.with_label_values(&["add"]).start_timer(); - let mut txids = vec![]; - // Phase 1: add to txstore - for tx in txs { - let txid = tx.txid(); - txids.push(txid); + let spent_prevouts = get_prev_outpoints(&txs); + let txs_map = txs + .into_iter() + .map(|tx| (tx.txid(), tx)) + .collect::>(); + + // Lookup spent prevouts that were funded within the same `add` batch + let mut txos = HashMap::new(); + let remain_prevouts = spent_prevouts + .into_iter() + .filter(|prevout| { + if let Some(prevtx) = txs_map.get(&prevout.txid) { + if let Some(out) = prevtx.output.get(prevout.vout as usize) { + txos.insert(prevout.clone(), out.clone()); + // remove from the list of remaining `prevouts` + return false; + } + } + true + }) + .collect(); + + // Lookup remaining spent prevouts in mempool & on-chain + // Fails if any are missing. + txos.extend(self.lookup_txos(remain_prevouts)?); + + // Add to txstore and indexes + for (txid, tx) in txs_map { self.txstore.insert(txid, tx); - } - // Phase 2: index history and spend edges (can fail if some txos cannot be found) - let txos = match self.lookup_txos(self.get_prevouts(&txids)) { - Ok(txos) => txos, - Err(err) => { - warn!("lookup txouts failed: {}", err); - // TODO: should we remove txids from txstore? - return; - } - }; - for txid in txids { - let tx = self.txstore.get(&txid).expect("missing mempool tx"); - let txid_bytes = full_hash(&txid[..]); + let tx = self.txstore.get(&txid).expect("was just added"); + let prevouts = extract_tx_prevouts(&tx, &txos, false); + let txid_bytes = full_hash(&txid[..]); // Get feeinfo for caching and recent tx overview let feeinfo = TxFeeInfo::new(&tx, &prevouts, self.config.network_type); @@ -395,6 +412,8 @@ impl Mempool { &mut self.asset_issuance, ); } + + Ok(()) } fn lookup_txo(&self, outpoint: &OutPoint) -> Option { @@ -423,24 +442,6 @@ impl Mempool { Ok(txos) } - fn get_prevouts(&self, txids: &[Txid]) -> BTreeSet { - let _timer = self - .latency - .with_label_values(&["get_prevouts"]) - .start_timer(); - - txids - .iter() - .map(|txid| self.txstore.get(txid).expect("missing mempool tx")) - .flat_map(|tx| { - tx.input - .iter() - .filter(|txin| has_prevout(txin)) - .map(|txin| txin.previous_output) - }) - .collect() - } - fn remove(&mut self, to_remove: HashSet<&Txid>) { self.delta .with_label_values(&["remove"]) @@ -510,7 +511,7 @@ impl Mempool { { let mut mempool = mempool.write().unwrap(); // Add new transactions - mempool.add(txs_to_add); + mempool.add(txs_to_add)?; mempool .count diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 60d7510ab..e178bec49 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -71,7 +71,8 @@ impl Query { pub fn broadcast_raw(&self, txhex: &str) -> Result { let txid = self.daemon.broadcast_raw(txhex)?; - self.mempool + let _ = self + .mempool .write() .unwrap() .add_by_txid(&self.daemon, &txid); diff --git a/src/util/mod.rs b/src/util/mod.rs index 3b03d41ce..0a9701b8e 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -12,8 +12,8 @@ pub use self::block::{ pub use self::fees::get_tx_fee; pub use self::script::{get_innerscripts, ScriptToAddr, ScriptToAsm}; pub use self::transaction::{ - extract_tx_prevouts, has_prevout, is_coinbase, is_spendable, serialize_outpoint, - TransactionStatus, TxInput, + extract_tx_prevouts, get_prev_outpoints, has_prevout, is_coinbase, is_spendable, + serialize_outpoint, TransactionStatus, TxInput, }; use std::collections::HashMap; diff --git a/src/util/transaction.rs b/src/util/transaction.rs index dda5e4f7c..9becde94b 100644 --- a/src/util/transaction.rs +++ b/src/util/transaction.rs @@ -1,7 +1,7 @@ use crate::chain::{BlockHash, OutPoint, Transaction, TxIn, TxOut, Txid}; use crate::util::BlockId; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; #[cfg(feature = "liquid")] lazy_static! { @@ -96,6 +96,17 @@ pub fn extract_tx_prevouts<'a>( .collect() } +pub fn get_prev_outpoints(txs: &[Transaction]) -> BTreeSet { + txs.iter() + .flat_map(|tx| { + tx.input + .iter() + .filter(|txin| has_prevout(txin)) + .map(|txin| txin.previous_output) + }) + .collect() +} + pub fn serialize_outpoint(outpoint: &OutPoint, serializer: S) -> Result where S: serde::ser::Serializer, From 4197301c744407947f053bc3af8ec600eff722b0 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Thu, 30 May 2024 01:10:55 +0300 Subject: [PATCH 3/7] Continuously attempt to fetch mempool transactions Previously, if any of the mempool transactions were not available because they were evicted between getting the mempool txids and txs themselves, the mempool syncing operation would be aborted and tried again from scratch. With this change, we instead keep whatever transactions we were able to fetch, then get the updated list of mempool txids and re-try fetching missing ones continuously until we're able to get a full snapshot. --- src/daemon.rs | 77 ++++++++++++++++++++++++++-------------- src/errors.rs | 5 +++ src/new_index/mempool.rs | 74 ++++++++++++++++++++++++-------------- src/util/transaction.rs | 17 +++++---- 4 files changed, 111 insertions(+), 62 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index cd4f81cd6..5db81dd96 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,7 +11,7 @@ use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; use error_chain::ChainedError; use hex::FromHex; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -81,13 +81,13 @@ fn parse_error_code(err: &Value) -> Option { fn parse_jsonrpc_reply(mut reply: Value, method: &str, expected_id: u64) -> Result { if let Some(reply_obj) = reply.as_object_mut() { - if let Some(err) = reply_obj.get("error") { + if let Some(err) = reply_obj.get_mut("error") { if !err.is_null() { if let Some(code) = parse_error_code(&err) { match code { // RPC_IN_WARMUP -> retry by later reconnection -28 => bail!(ErrorKind::Connection(err.to_string())), - _ => bail!("{} RPC error: {}", method, err), + code => bail!(ErrorKind::RpcError(code, err.take(), method.to_string())), } } } @@ -442,29 +442,37 @@ impl Daemon { } } - // Send requests in parallel over multiple connections as individual JSON-RPC requests (with no JSON-RPC batching) - fn requests(&self, method: &str, params_list: &[Value]) -> Result> { + // Send requests in parallel over multiple RPC connections as individual JSON-RPC requests (with no JSON-RPC batching), + // buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned. + fn requests(&self, method: &str, params_list: Vec) -> Result> { + self.requests_iter(method, params_list).collect() + } + + // Send requests in parallel over multiple RPC connections, iterating over the results without buffering them. + // Errors are included in the iterator and do not terminate other pending requests. + fn requests_iter<'a>( + &'a self, + method: &'a str, + params_list: Vec, + ) -> impl ParallelIterator> + 'a { let thread_pool = rayon::ThreadPoolBuilder::new() .num_threads(self.daemon_parallelism) .thread_name(|i| format!("rpc-requests-{}", i)) .build() .unwrap(); - thread_pool.install(|| { - params_list - .par_iter() - .map(|params| { - // Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as - // necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped. - thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); - - DAEMON_INSTANCE.with(|daemon| { - daemon - .get_or_init(|| self.retry_reconnect()) - .retry_request(method, params) - }) + thread_pool.install(move || { + params_list.into_par_iter().map(move |params| { + // Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as + // necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped. + thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); + + DAEMON_INSTANCE.with(|daemon| { + daemon + .get_or_init(|| self.retry_reconnect()) + .retry_request(&method, ¶ms) }) - .collect() + }) }) } @@ -491,12 +499,12 @@ impl Daemon { pub fn getblockheaders(&self, heights: &[usize]) -> Result> { let heights: Vec = heights.iter().map(|height| json!([height])).collect(); let params_list: Vec = self - .requests("getblockhash", &heights)? + .requests("getblockhash", heights)? .into_iter() .map(|hash| json!([hash, /*verbose=*/ false])) .collect(); let mut result = vec![]; - for h in self.requests("getblockheader", ¶ms_list)? { + for h in self.requests("getblockheader", params_list)? { result.push(header_from_value(h)?); } Ok(result) @@ -518,7 +526,7 @@ impl Daemon { .iter() .map(|hash| json!([hash, /*verbose=*/ false])) .collect(); - let values = self.requests("getblock", ¶ms_list)?; + let values = self.requests("getblock", params_list)?; let mut blocks = vec![]; for value in values { blocks.push(block_from_value(value)?); @@ -526,14 +534,29 @@ impl Daemon { Ok(blocks) } - pub fn gettransactions(&self, txhashes: &[&Txid]) -> Result> { - let params_list: Vec = txhashes + /// Fetch the given transactions in parallel over multiple threads and RPC connections, + /// ignoring any missing ones and returning whatever is available. + pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { + const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; + + let params_list: Vec = txids .iter() .map(|txhash| json!([txhash, /*verbose=*/ false])) .collect(); - let values = self.requests("getrawtransaction", ¶ms_list)?; - values.into_iter().map(tx_from_value).collect() + self.requests_iter("getrawtransaction", params_list) + .filter_map(|res| match res { + Ok(val) => Some(tx_from_value(val)), + // Ignore 'tx not found' errors + Err(Error(ErrorKind::RpcError(code, _, _), _)) + if code == RPC_INVALID_ADDRESS_OR_KEY => + { + None + } + // Terminate iteration if any other errors are encountered + Err(e) => Some(Err(e)), + }) + .collect() } pub fn gettransaction_raw( @@ -574,7 +597,7 @@ impl Daemon { let params_list: Vec = conf_targets.iter().map(|t| json!([t, "ECONOMICAL"])).collect(); Ok(self - .requests("estimatesmartfee", ¶ms_list)? + .requests("estimatesmartfee", params_list)? .iter() .zip(conf_targets) .filter_map(|(reply, target)| { diff --git a/src/errors.rs b/src/errors.rs index cec50ccef..c708d7dda 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -9,6 +9,11 @@ error_chain! { display("Connection error: {}", msg) } + RpcError(code: i64, error: serde_json::Value, method: String) { + description("RPC error") + display("{} RPC error {}: {}", method, code, error) + } + Interrupt(sig: i32) { description("Interruption by external signal") display("Iterrupted by signal {}", sig) diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 770c328da..b260c09af 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -276,7 +276,7 @@ impl Mempool { &self.backlog_stats.0 } - pub fn old_txids(&self) -> HashSet { + pub fn txids_set(&self) -> HashSet { return HashSet::from_iter(self.txstore.keys().cloned()); } @@ -291,7 +291,11 @@ impl Mempool { pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { if self.txstore.get(txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { - self.add(vec![tx]) + self.add({ + let mut txs_map = HashMap::new(); + txs_map.insert(tx.txid(), tx); + txs_map + }) } else { bail!("add_by_txid cannot find {}", txid); } @@ -300,17 +304,13 @@ impl Mempool { } } - fn add(&mut self, txs: Vec) -> Result<()> { + fn add(&mut self, txs_map: HashMap) -> Result<()> { self.delta .with_label_values(&["add"]) - .observe(txs.len() as f64); + .observe(txs_map.len() as f64); let _timer = self.latency.with_label_values(&["add"]).start_timer(); - let spent_prevouts = get_prev_outpoints(&txs); - let txs_map = txs - .into_iter() - .map(|tx| (tx.txid(), tx)) - .collect::>(); + let spent_prevouts = get_prev_outpoints(txs_map.values()); // Lookup spent prevouts that were funded within the same `add` batch let mut txos = HashMap::new(); @@ -487,31 +487,53 @@ impl Mempool { .map_or_else(|| vec![], |entries| self._history(entries, limit)) } + /// Sync our local view of the mempool with the bitcoind RPC. pub fn update(mempool: &Arc>, daemon: &Daemon) -> Result<()> { let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer(); - // 1. Determine which transactions are no longer in the daemon's mempool and which ones have newly entered it - let old_txids = mempool.read().unwrap().old_txids(); - let all_txids = daemon - .getmempooltxids() - .chain_err(|| "failed to update mempool from daemon")?; - let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect(); + // Continuously attempt to fetch mempool transactions until we're able to get them in full + let mut fetched_txs = HashMap::::new(); + let mut indexed_txids = mempool.read().unwrap().txids_set(); + loop { + // Get bitcoind's current list of mempool txids + let all_txids = daemon + .getmempooltxids() + .chain_err(|| "failed to update mempool from daemon")?; + + // Remove evicted mempool transactions + mempool + .write() + .unwrap() + .remove(indexed_txids.difference(&all_txids).collect()); - // 2. Remove missing transactions. Even if we are unable to download new transactions from - // the daemon, we still want to remove the transactions that are no longer in the mempool. - mempool.write().unwrap().remove(txids_to_remove); + indexed_txids.retain(|txid| all_txids.contains(txid)); + fetched_txs.retain(|txid, _| all_txids.contains(txid)); - // 3. Download the new transactions from the daemon's mempool - let new_txids: Vec<&Txid> = all_txids.difference(&old_txids).collect(); - let txs_to_add = daemon - .gettransactions(&new_txids) - .chain_err(|| format!("failed to get {} transactions", new_txids.len()))?; + // Fetch missing transactions from bitcoind + let new_txids = all_txids + .iter() + .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) + .collect::>(); + let new_txs = daemon.gettransactions_available(&new_txids)?; + let fetched_count = new_txs.len(); + fetched_txs.extend(&mut new_txs.into_iter().map(|tx| (tx.txid(), tx))); + + // Retry if any transactions were evicted form the mempool before we managed to get them + if fetched_count != new_txids.len() { + warn!( + "failed to fetch {} mempool txs, retrying...", + new_txids.len() - fetched_count + ); + } else { + break; + } + } - // 4. Update local mempool to match daemon's state + // Add fetched transactions to our view of the mempool { let mut mempool = mempool.write().unwrap(); - // Add new transactions - mempool.add(txs_to_add)?; + + mempool.add(fetched_txs)?; mempool .count diff --git a/src/util/transaction.rs b/src/util/transaction.rs index 9becde94b..4daa8547b 100644 --- a/src/util/transaction.rs +++ b/src/util/transaction.rs @@ -96,15 +96,14 @@ pub fn extract_tx_prevouts<'a>( .collect() } -pub fn get_prev_outpoints(txs: &[Transaction]) -> BTreeSet { - txs.iter() - .flat_map(|tx| { - tx.input - .iter() - .filter(|txin| has_prevout(txin)) - .map(|txin| txin.previous_output) - }) - .collect() +pub fn get_prev_outpoints<'a>(txs: impl Iterator) -> BTreeSet { + txs.flat_map(|tx| { + tx.input + .iter() + .filter(|txin| has_prevout(txin)) + .map(|txin| txin.previous_output) + }) + .collect() } pub fn serialize_outpoint(outpoint: &OutPoint, serializer: S) -> Result From e3481c76477d7a44100f819bb99c26bc8cf53910 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Sat, 1 Jun 2024 16:33:45 +0300 Subject: [PATCH 4/7] Make sure the chain tip doesn't move while fetching the mempool --- src/bin/electrs.rs | 22 ++++++++-------------- src/new_index/mempool.rs | 20 ++++++++++++++++---- tests/common.rs | 8 ++++---- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index ee92d7e0d..4d4eb1c17 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -82,14 +82,11 @@ fn run_server(config: Arc) -> Result<()> { &metrics, Arc::clone(&config), ))); - loop { - match Mempool::update(&mempool, &daemon) { - Ok(_) => break, - Err(e) => { - warn!("Error performing initial mempool update, trying again in 5 seconds: {}", e.display_chain()); - signal.wait(Duration::from_secs(5), false)?; - }, - } + + while !Mempool::update(&mempool, &daemon, &tip)? { + // Mempool syncing was aborted because the chain tip moved; + // Index the new block(s) and try again. + tip = indexer.update(&daemon)?; } #[cfg(feature = "liquid")] @@ -118,7 +115,6 @@ fn run_server(config: Arc) -> Result<()> { )); loop { - main_loop_count.inc(); if let Err(err) = signal.wait(Duration::from_secs(5), true) { @@ -131,14 +127,12 @@ fn run_server(config: Arc) -> Result<()> { // Index new blocks let current_tip = daemon.getbestblockhash()?; if current_tip != tip { - indexer.update(&daemon)?; - tip = current_tip; + tip = indexer.update(&daemon)?; }; // Update mempool - if let Err(e) = Mempool::update(&mempool, &daemon) { - // Log the error if the result is an Err - warn!("Error updating mempool, skipping mempool update: {}", e.display_chain()); + if !Mempool::update(&mempool, &daemon, &tip)? { + warn!("skipped failed mempool update, trying again in 5 seconds"); } // Update subscribed clients diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index b260c09af..8dae5bc62 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -11,7 +11,7 @@ use std::iter::FromIterator; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid}; +use crate::chain::{deserialize, BlockHash, Network, OutPoint, Transaction, TxOut, Txid}; use crate::config::Config; use crate::daemon::Daemon; use crate::errors::*; @@ -487,8 +487,13 @@ impl Mempool { .map_or_else(|| vec![], |entries| self._history(entries, limit)) } - /// Sync our local view of the mempool with the bitcoind RPC. - pub fn update(mempool: &Arc>, daemon: &Daemon) -> Result<()> { + /// Sync our local view of the mempool with the bitcoind Daemon RPC. If the chain tip moves before + /// the mempool is fetched in full, syncing is aborted and an Ok(false) is returned. + pub fn update( + mempool: &Arc>, + daemon: &Daemon, + tip: &BlockHash, + ) -> Result { let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer(); // Continuously attempt to fetch mempool transactions until we're able to get them in full @@ -515,6 +520,13 @@ impl Mempool { .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) .collect::>(); let new_txs = daemon.gettransactions_available(&new_txids)?; + + // Abort if the chain tip moved while fetching transactions + if daemon.getbestblockhash()? != *tip { + warn!("chain tip moved while updating mempool"); + return Ok(false); + } + let fetched_count = new_txs.len(); fetched_txs.extend(&mut new_txs.into_iter().map(|tx| (tx.txid(), tx))); @@ -546,7 +558,7 @@ impl Mempool { } } - Ok(()) + Ok(true) } } diff --git a/tests/common.rs b/tests/common.rs index f08f850cc..1c06c9ed7 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -149,7 +149,7 @@ impl TestRunner { }; let mut indexer = Indexer::open(Arc::clone(&store), fetch_from, &config, &metrics); - indexer.update(&daemon)?; + let tip = indexer.update(&daemon)?; indexer.fetch_from(FetchFrom::Bitcoind); let chain = Arc::new(ChainQuery::new( @@ -164,7 +164,7 @@ impl TestRunner { &metrics, Arc::clone(&config), ))); - Mempool::update(&mempool, &daemon)?; + assert!(Mempool::update(&mempool, &daemon, &tip)?); let query = Arc::new(Query::new( Arc::clone(&chain), @@ -195,8 +195,8 @@ impl TestRunner { } pub fn sync(&mut self) -> Result<()> { - self.indexer.update(&self.daemon)?; - Mempool::update(&self.mempool, &self.daemon)?; + let tip = self.indexer.update(&self.daemon)?; + assert!(Mempool::update(&self.mempool, &self.daemon, &tip)?); // force an update for the mempool stats, which are normally cached self.mempool.write().unwrap().update_backlog_stats(); Ok(()) From ebcd7371efcdf612ba2016f69bee723f9a04c166 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Tue, 6 Aug 2024 05:15:59 +0300 Subject: [PATCH 5/7] Update logging verbosity - Reduce logging level for bitcoind's JSONRPC response errors These can happen pretty often for missing mempool txs and do not warrant warn-level logging. Unexpected RPC errors will bubble up and be handled appropriately. - Add more verbose logging for mempool syncing --- src/daemon.rs | 6 +++--- src/new_index/mempool.rs | 11 +++++++++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 5db81dd96..ca5e8a018 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -250,7 +250,7 @@ impl Connection { Ok(if status == "HTTP/1.1 200 OK" { contents } else if status == "HTTP/1.1 500 Internal Server Error" { - warn!("HTTP status: {}", status); + debug!("RPC HTTP 500 error: {}", contents); contents // the contents should have a JSONRPC error field } else { bail!( @@ -414,8 +414,8 @@ impl Daemon { fn retry_request(&self, method: &str, params: &Value) -> Result { loop { match self.handle_request(method, ¶ms) { - Err(Error(ErrorKind::Connection(msg), _)) => { - warn!("reconnecting to bitcoind: {}", msg); + Err(e @ Error(ErrorKind::Connection(_), _)) => { + warn!("reconnecting to bitcoind: {}", e.display_chain()); self.signal.wait(Duration::from_secs(3), false)?; let mut conn = self.conn.lock().unwrap(); *conn = conn.reconnect()?; diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 8dae5bc62..864155aba 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -519,6 +519,15 @@ impl Mempool { .iter() .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) .collect::>(); + if new_txids.is_empty() { + break; + } + debug!( + "mempool with total {} txs, {} fetched, {} missing", + all_txids.len(), + indexed_txids.len() + fetched_txs.len(), + new_txids.len() + ); let new_txs = daemon.gettransactions_available(&new_txids)?; // Abort if the chain tip moved while fetching transactions @@ -558,6 +567,8 @@ impl Mempool { } } + trace!("mempool is synced"); + Ok(true) } } From aee32b618fa74163a84dd2a61c4a1c79401d1137 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Tue, 6 Aug 2024 05:16:07 +0300 Subject: [PATCH 6/7] Reuse RPC threads and TCP connections Keep RPC TCP connections open between sync runs and reuse them, to reduce TCP connection initialization overhead. --- src/daemon.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index ca5e8a018..67b3c0631 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -283,13 +283,14 @@ impl Counter { pub struct Daemon { daemon_dir: PathBuf, - daemon_parallelism: usize, blocks_dir: PathBuf, network: Network, conn: Mutex, message_id: Counter, // for monotonic JSONRPC 'id' signal: Waiter, + rpc_threads: Arc, + // monitoring latency: HistogramVec, size: HistogramVec, @@ -308,7 +309,6 @@ impl Daemon { ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), - daemon_parallelism, blocks_dir: blocks_dir.clone(), network, conn: Mutex::new(Connection::new( @@ -318,6 +318,13 @@ impl Daemon { )?), message_id: Counter::new(), signal: signal.clone(), + rpc_threads: Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(daemon_parallelism) + .thread_name(|i| format!("rpc-requests-{}", i)) + .build() + .unwrap(), + ), latency: metrics.histogram_vec( HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"), &["method"], @@ -361,12 +368,12 @@ impl Daemon { pub fn reconnect(&self) -> Result { Ok(Daemon { daemon_dir: self.daemon_dir.clone(), - daemon_parallelism: self.daemon_parallelism, blocks_dir: self.blocks_dir.clone(), network: self.network, conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), message_id: Counter::new(), signal: self.signal.clone(), + rpc_threads: self.rpc_threads.clone(), latency: self.latency.clone(), size: self.size.clone(), }) @@ -455,16 +462,10 @@ impl Daemon { method: &'a str, params_list: Vec, ) -> impl ParallelIterator> + 'a { - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(self.daemon_parallelism) - .thread_name(|i| format!("rpc-requests-{}", i)) - .build() - .unwrap(); - - thread_pool.install(move || { + self.rpc_threads.install(move || { params_list.into_par_iter().map(move |params| { - // Store a local per-thread Daemon, each with its own TCP connection. These will get initialized as - // necessary for the threads managed by rayon, and get destroyed when the thread pool is dropped. + // Store a local per-thread Daemon, each with its own TCP connection. These will + // get initialized as necessary for the `rpc_threads` pool thread managed by rayon. thread_local!(static DAEMON_INSTANCE: OnceCell = OnceCell::new()); DAEMON_INSTANCE.with(|daemon| { From 7a068bf0e0dc3c063b0f661bc178fd8b4c3e2290 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Tue, 6 Aug 2024 05:16:10 +0300 Subject: [PATCH 7/7] Avoid recomputing txids when possible Following https://github.com/Blockstream/electrs/pull/89#discussion_r1632848608 and https://github.com/Blockstream/electrs/pull/89#discussion_r1632919266 --- src/daemon.rs | 11 ++++++----- src/new_index/mempool.rs | 14 ++++++-------- src/new_index/query.rs | 2 +- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 67b3c0631..c9c0a835f 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,7 +11,7 @@ use std::time::Duration; use base64::prelude::{Engine, BASE64_STANDARD}; use error_chain::ChainedError; use hex::FromHex; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -461,7 +461,7 @@ impl Daemon { &'a self, method: &'a str, params_list: Vec, - ) -> impl ParallelIterator> + 'a { + ) -> impl ParallelIterator> + IndexedParallelIterator + 'a { self.rpc_threads.install(move || { params_list.into_par_iter().map(move |params| { // Store a local per-thread Daemon, each with its own TCP connection. These will @@ -537,7 +537,7 @@ impl Daemon { /// Fetch the given transactions in parallel over multiple threads and RPC connections, /// ignoring any missing ones and returning whatever is available. - pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { + pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; let params_list: Vec = txids @@ -546,8 +546,9 @@ impl Daemon { .collect(); self.requests_iter("getrawtransaction", params_list) - .filter_map(|res| match res { - Ok(val) => Some(tx_from_value(val)), + .zip(txids) + .filter_map(|(res, txid)| match res { + Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))), // Ignore 'tx not found' errors Err(Error(ErrorKind::RpcError(code, _, _), _)) if code == RPC_INVALID_ADDRESS_OR_KEY => diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 864155aba..4ccd4cd59 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -288,14 +288,12 @@ impl Mempool { self.backlog_stats = (BacklogStats::new(&self.feeinfo), Instant::now()); } - pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { - if self.txstore.get(txid).is_none() { + pub fn add_by_txid(&mut self, daemon: &Daemon, txid: Txid) -> Result<()> { + if self.txstore.get(&txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { - self.add({ - let mut txs_map = HashMap::new(); - txs_map.insert(tx.txid(), tx); - txs_map - }) + let mut txs_map = HashMap::new(); + txs_map.insert(txid, tx); + self.add(txs_map) } else { bail!("add_by_txid cannot find {}", txid); } @@ -537,7 +535,7 @@ impl Mempool { } let fetched_count = new_txs.len(); - fetched_txs.extend(&mut new_txs.into_iter().map(|tx| (tx.txid(), tx))); + fetched_txs.extend(new_txs); // Retry if any transactions were evicted form the mempool before we managed to get them if fetched_count != new_txids.len() { diff --git a/src/new_index/query.rs b/src/new_index/query.rs index e178bec49..26d983734 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -75,7 +75,7 @@ impl Query { .mempool .write() .unwrap() - .add_by_txid(&self.daemon, &txid); + .add_by_txid(&self.daemon, txid); Ok(txid) }