diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 5eafcfac7..0d78bf001 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -5,12 +5,9 @@ extern crate log; extern crate electrs; use error_chain::ChainedError; -use std::collections::HashSet; use std::process; use std::sync::{Arc, RwLock}; -use std::time::{Duration, Instant}; -use bitcoin::Txid; -use serde_json::json; +use std::time::Duration; use electrs::{ config::{Config, get_num_threads}, @@ -88,7 +85,7 @@ fn run_server(config: Arc) -> Result<()> { &metrics, Arc::clone(&config), ))); - mempool.write().unwrap().update(&daemon)?; + Mempool::update(&mempool, &daemon)?; #[cfg(feature = "liquid")] let asset_db = config.asset_db_path.as_ref().map(|db_dir| { @@ -125,29 +122,8 @@ fn run_server(config: Arc) -> Result<()> { tip = current_tip; }; - // FIXME(jamesdorfman): couldn't figure out how to import it from util - let log_fn_duration = |fn_name: &str, duration: u128| { - let log = json!({ - "fn_name": fn_name, - "duration_micros": duration, - }); - println!("{}", log); - }; - // Update mempool - - let t = Instant::now(); - - let old_txids = mempool.read().unwrap().old_txids(); - let new_txids = daemon - .getmempooltxids() - .chain_err(|| "failed to update mempool from daemon")?; - let old_mempool_txs: HashSet<&Txid> = old_txids.difference(&new_txids).collect(); - - log_fn_duration("mempool::paratial_tx_fetch", t.elapsed().as_micros()); - - let new_mempool_txs = Mempool::download_new_mempool_txs(&daemon, &old_txids, &new_txids); - mempool.write().unwrap().update_quick( &new_mempool_txs, &old_mempool_txs)?; + Mempool::update(&mempool, &daemon)?; // Update subscribed clients electrum_server.notify(); diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 7717a8e81..9ecdc924e 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -8,7 +8,7 @@ use elements::{encode::serialize, AssetId}; use std::collections::{BTreeSet, HashMap, HashSet}; use std::iter::FromIterator; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid}; @@ -289,80 +289,6 @@ impl Mempool { return HashSet::from_iter(self.txstore.keys().cloned()); } - pub fn download_new_mempool_txs(daemon: &Daemon, old_txids: &HashSet, new_txids: &HashSet) -> Vec { - let t = Instant::now(); - - let txids: Vec<&Txid> = (*new_txids).difference(old_txids).collect(); - let tranactions = match daemon.gettransactions(&txids) { - Ok(txs) => txs, - Err(err) => { - warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF - vec![] // return an empty vector if there's an error - } - }; - - log_fn_duration("mempool::download_new_mempool_txs", t.elapsed().as_micros()); - return tranactions; - } - - pub fn update_quick(&mut self, to_add: &Vec, to_remove: &HashSet<&Txid>) -> Result<()> { - let t = Instant::now(); - let _timer = self.latency.with_label_values(&["update"]).start_timer(); - - // Add new transactions - self.add(to_add.clone()); - // Remove missing transactions - self.remove(to_remove.clone()); - - self.count - .with_label_values(&["txs"]) - .set(self.txstore.len() as f64); - - // Update cached backlog stats (if expired) - if self.backlog_stats.1.elapsed() > Duration::from_secs(BACKLOG_STATS_TTL) { - self.update_backlog_stats(); - } - - log_fn_duration("mempool::update_quick", t.elapsed().as_micros()); - Ok(()) - } - - pub fn update(&mut self, daemon: &Daemon) -> Result<()> { - let t = Instant::now(); - let _timer = self.latency.with_label_values(&["update"]).start_timer(); - let new_txids = daemon - .getmempooltxids() - .chain_err(|| "failed to update mempool from daemon")?; - let old_txids = HashSet::from_iter(self.txstore.keys().cloned()); - let to_remove: HashSet<&Txid> = old_txids.difference(&new_txids).collect(); - - // Download and add new transactions from bitcoind's mempool - let txids: Vec<&Txid> = new_txids.difference(&old_txids).collect(); - let to_add = match daemon.gettransactions(&txids) { - Ok(txs) => txs, - Err(err) => { - warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF - return Ok(()); // keep the mempool until next update() - } - }; - // Add new transactions - self.add(to_add); - // Remove missing transactions - self.remove(to_remove); - - self.count - .with_label_values(&["txs"]) - .set(self.txstore.len() as f64); - - // Update cached backlog stats (if expired) - if self.backlog_stats.1.elapsed() > Duration::from_secs(BACKLOG_STATS_TTL) { - self.update_backlog_stats(); - } - - log_fn_duration("mempool::update", t.elapsed().as_micros()); - Ok(()) - } - pub fn update_backlog_stats(&mut self) { let _timer = self .latency @@ -577,6 +503,68 @@ impl Mempool { .get(asset_id) .map_or_else(|| vec![], |entries| self._history(entries, limit)) } + + pub fn download_new_mempool_txs( + daemon: &Daemon, + old_txids: &HashSet, + new_txids: &HashSet + ) -> Result>{ + let t = Instant::now(); + + let txids: Vec<&Txid> = (*new_txids).difference(old_txids).collect(); + let transactions = match daemon.gettransactions(&txids) { + Ok(txs) => txs, + Err(err) => { + warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF + return Err(err); + } + }; + + log_fn_duration("mempool::download_new_mempool_txs", t.elapsed().as_micros()); + return Ok(transactions); + } + + pub fn update(mempool: &Arc>, daemon: &Daemon) -> Result<()> { + let t = Instant::now(); + + // 1. Determine which transactions are no longer in the daemon's mempool + let old_txids = mempool.read().unwrap().old_txids(); + let new_txids = daemon + .getmempooltxids() + .chain_err(|| "failed to update mempool from daemon")?; + let old_mempool_txs: HashSet<&Txid> = old_txids.difference(&new_txids).collect(); + log_fn_duration("mempool::paratial_tx_fetch", t.elapsed().as_micros()); + + // 2. Download new transactions from the daemon's mempool + let new_mempool_txs = match Mempool::download_new_mempool_txs(&daemon, &old_txids, &new_txids) { + Ok(txs) => txs, + Err(_) => { + warn!("Failed to get new mempool txs, skipping mempool update"); + return Ok(()); + } + }; + + // 3. Update local mempool to match daemon's state + { + let mut mempool_guard = mempool.write().unwrap(); + // Add new transactions + mempool_guard.add(new_mempool_txs.clone()); + // Remove missing transactions + mempool_guard.remove(old_mempool_txs.clone()); + + mempool_guard.count + .with_label_values(&["txs"]) + .set(mempool_guard.txstore.len() as f64); + + // Update cached backlog stats (if expired) + if mempool_guard.backlog_stats.1.elapsed() > Duration::from_secs(BACKLOG_STATS_TTL) { + mempool_guard.update_backlog_stats(); + } + } + + log_fn_duration("mempool::update", t.elapsed().as_micros()); + Ok(()) + } } #[derive(Serialize)]