Skip to content

Commit

Permalink
address review comments from shesek
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesdorfman committed Mar 26, 2024
1 parent cc2aaef commit 28d3c6b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 102 deletions.
30 changes: 3 additions & 27 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@ extern crate log;
extern crate electrs;

use error_chain::ChainedError;
use std::collections::HashSet;
use std::process;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use bitcoin::Txid;
use serde_json::json;
use std::time::Duration;

use electrs::{
config::{Config, get_num_threads},
Expand Down Expand Up @@ -88,7 +85,7 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
Arc::clone(&config),
)));
mempool.write().unwrap().update(&daemon)?;
Mempool::update(&mempool, &daemon)?;

#[cfg(feature = "liquid")]
let asset_db = config.asset_db_path.as_ref().map(|db_dir| {
Expand Down Expand Up @@ -125,29 +122,8 @@ fn run_server(config: Arc<Config>) -> Result<()> {
tip = current_tip;
};

// FIXME(jamesdorfman): couldn't figure out how to import it from util
let log_fn_duration = |fn_name: &str, duration: u128| {
let log = json!({
"fn_name": fn_name,
"duration_micros": duration,
});
println!("{}", log);
};

// Update mempool

let t = Instant::now();

let old_txids = mempool.read().unwrap().old_txids();
let new_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;
let old_mempool_txs: HashSet<&Txid> = old_txids.difference(&new_txids).collect();

log_fn_duration("mempool::paratial_tx_fetch", t.elapsed().as_micros());

let new_mempool_txs = Mempool::download_new_mempool_txs(&daemon, &old_txids, &new_txids);
mempool.write().unwrap().update_quick( &new_mempool_txs, &old_mempool_txs)?;
Mempool::update(&mempool, &daemon)?;

// Update subscribed clients
electrum_server.notify();
Expand Down
138 changes: 63 additions & 75 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use elements::{encode::serialize, AssetId};

use std::collections::{BTreeSet, HashMap, HashSet};
use std::iter::FromIterator;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid};
Expand Down Expand Up @@ -289,80 +289,6 @@ impl Mempool {
return HashSet::from_iter(self.txstore.keys().cloned());
}

pub fn download_new_mempool_txs(daemon: &Daemon, old_txids: &HashSet<Txid>, new_txids: &HashSet<Txid>) -> Vec<Transaction> {
let t = Instant::now();

let txids: Vec<&Txid> = (*new_txids).difference(old_txids).collect();
let tranactions = match daemon.gettransactions(&txids) {
Ok(txs) => txs,
Err(err) => {
warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF
vec![] // return an empty vector if there's an error
}
};

log_fn_duration("mempool::download_new_mempool_txs", t.elapsed().as_micros());
return tranactions;
}

pub fn update_quick(&mut self, to_add: &Vec<Transaction>, to_remove: &HashSet<&Txid>) -> Result<()> {
let t = Instant::now();
let _timer = self.latency.with_label_values(&["update"]).start_timer();

// Add new transactions
self.add(to_add.clone());
// Remove missing transactions
self.remove(to_remove.clone());

self.count
.with_label_values(&["txs"])
.set(self.txstore.len() as f64);

// Update cached backlog stats (if expired)
if self.backlog_stats.1.elapsed() > Duration::from_secs(BACKLOG_STATS_TTL) {
self.update_backlog_stats();
}

log_fn_duration("mempool::update_quick", t.elapsed().as_micros());
Ok(())
}

pub fn update(&mut self, daemon: &Daemon) -> Result<()> {
let t = Instant::now();
let _timer = self.latency.with_label_values(&["update"]).start_timer();
let new_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;
let old_txids = HashSet::from_iter(self.txstore.keys().cloned());
let to_remove: HashSet<&Txid> = old_txids.difference(&new_txids).collect();

// Download and add new transactions from bitcoind's mempool
let txids: Vec<&Txid> = new_txids.difference(&old_txids).collect();
let to_add = match daemon.gettransactions(&txids) {
Ok(txs) => txs,
Err(err) => {
warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF
return Ok(()); // keep the mempool until next update()
}
};
// Add new transactions
self.add(to_add);
// Remove missing transactions
self.remove(to_remove);

self.count
.with_label_values(&["txs"])
.set(self.txstore.len() as f64);

// Update cached backlog stats (if expired)
if self.backlog_stats.1.elapsed() > Duration::from_secs(BACKLOG_STATS_TTL) {
self.update_backlog_stats();
}

log_fn_duration("mempool::update", t.elapsed().as_micros());
Ok(())
}

pub fn update_backlog_stats(&mut self) {
let _timer = self
.latency
Expand Down Expand Up @@ -577,6 +503,68 @@ impl Mempool {
.get(asset_id)
.map_or_else(|| vec![], |entries| self._history(entries, limit))
}

pub fn download_new_mempool_txs(
daemon: &Daemon,
old_txids: &HashSet<Txid>,
new_txids: &HashSet<Txid>
) -> Result<Vec<Transaction>>{
let t = Instant::now();

let txids: Vec<&Txid> = (*new_txids).difference(old_txids).collect();
let transactions = match daemon.gettransactions(&txids) {
Ok(txs) => txs,
Err(err) => {
warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF
return Err(err);
}
};

log_fn_duration("mempool::download_new_mempool_txs", t.elapsed().as_micros());
return Ok(transactions);
}

pub fn update(mempool: &Arc<RwLock<Mempool>>, daemon: &Daemon) -> Result<()> {
let t = Instant::now();

// 1. Determine which transactions are no longer in the daemon's mempool
let old_txids = mempool.read().unwrap().old_txids();
let new_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;
let old_mempool_txs: HashSet<&Txid> = old_txids.difference(&new_txids).collect();
log_fn_duration("mempool::paratial_tx_fetch", t.elapsed().as_micros());

// 2. Download new transactions from the daemon's mempool
let new_mempool_txs = match Mempool::download_new_mempool_txs(&daemon, &old_txids, &new_txids) {
Ok(txs) => txs,
Err(_) => {
warn!("Failed to get new mempool txs, skipping mempool update");
return Ok(());
}
};

// 3. Update local mempool to match daemon's state
{
let mut mempool_guard = mempool.write().unwrap();
// Add new transactions
mempool_guard.add(new_mempool_txs.clone());
// Remove missing transactions
mempool_guard.remove(old_mempool_txs.clone());

mempool_guard.count
.with_label_values(&["txs"])
.set(mempool_guard.txstore.len() as f64);

// Update cached backlog stats (if expired)
if mempool_guard.backlog_stats.1.elapsed() > Duration::from_secs(BACKLOG_STATS_TTL) {
mempool_guard.update_backlog_stats();
}
}

log_fn_duration("mempool::update", t.elapsed().as_micros());
Ok(())
}
}

#[derive(Serialize)]
Expand Down

0 comments on commit 28d3c6b

Please sign in to comment.