Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split up mempool fetch and mempool write #77

Merged
merged 5 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,15 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
Arc::clone(&config),
)));
mempool.write().unwrap().update(&daemon)?;
loop {
match Mempool::update(&mempool, &daemon) {
Ok(_) => break,
Err(e) => {
warn!("Error performing initial mempool update, trying again in 5 seconds: {}", e.display_chain());
signal.wait(Duration::from_secs(5), false)?;
},
}
}

#[cfg(feature = "liquid")]
let asset_db = config.asset_db_path.as_ref().map(|db_dir| {
Expand Down Expand Up @@ -118,7 +126,10 @@ fn run_server(config: Arc<Config>) -> Result<()> {
};

// Update mempool
mempool.write().unwrap().update(&daemon)?;
if let Err(e) = Mempool::update(&mempool, &daemon) {
// Log the error if the result is an Err
warn!("Error updating mempool, skipping mempool update: {}", e.display_chain());
}

// Update subscribed clients
electrum_server.notify();
Expand Down
76 changes: 43 additions & 33 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use elements::{encode::serialize, AssetId};

use std::collections::{BTreeSet, HashMap, HashSet};
use std::iter::FromIterator;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use crate::chain::{deserialize, Network, OutPoint, Transaction, TxOut, Txid};
Expand Down Expand Up @@ -276,38 +276,8 @@ impl Mempool {
&self.backlog_stats.0
}

pub fn update(&mut self, daemon: &Daemon) -> Result<()> {
let _timer = self.latency.with_label_values(&["update"]).start_timer();
let new_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;
let old_txids = HashSet::from_iter(self.txstore.keys().cloned());
let to_remove: HashSet<&Txid> = old_txids.difference(&new_txids).collect();

// Download and add new transactions from bitcoind's mempool
let txids: Vec<&Txid> = new_txids.difference(&old_txids).collect();
let to_add = match daemon.gettransactions(&txids) {
Ok(txs) => txs,
Err(err) => {
warn!("failed to get {} transactions: {}", txids.len(), err); // e.g. new block or RBF
return Ok(()); // keep the mempool until next update()
}
};
// Add new transactions
self.add(to_add);
// Remove missing transactions
self.remove(to_remove);

self.count
.with_label_values(&["txs"])
.set(self.txstore.len() as f64);

// Update cached backlog stats (if expired)
if self.backlog_stats.1.elapsed() > Duration::from_secs(BACKLOG_STATS_TTL) {
self.update_backlog_stats();
}

Ok(())
pub fn old_txids(&self) -> HashSet<Txid> {
return HashSet::from_iter(self.txstore.keys().cloned());
}

pub fn update_backlog_stats(&mut self) {
Expand Down Expand Up @@ -520,6 +490,46 @@ impl Mempool {
.get(asset_id)
.map_or_else(|| vec![], |entries| self._history(entries, limit))
}

pub fn update(mempool: &Arc<RwLock<Mempool>>, daemon: &Daemon) -> Result<()> {
let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer();

// 1. Determine which transactions are no longer in the daemon's mempool and which ones have newly entered it
let old_txids = mempool.read().unwrap().old_txids();
let all_txids = daemon
.getmempooltxids()
.chain_err(|| "failed to update mempool from daemon")?;
let txids_to_remove: HashSet<&Txid> = old_txids.difference(&all_txids).collect();

// 2. Remove missing transactions. Even if we are unable to download new transactions from
// the daemon, we still want to remove the transactions that are no longer in the mempool.
mempool.write().unwrap().remove(txids_to_remove);

// 3. Download the new transactions from the daemon's mempool
let new_txids: Vec<&Txid> = all_txids.difference(&old_txids).collect();
let txs_to_add = daemon
.gettransactions(&new_txids)
.chain_err(|| format!("failed to get {} transactions", new_txids.len()))?;

// 4. Update local mempool to match daemon's state
{
shesek marked this conversation as resolved.
Show resolved Hide resolved
let mut mempool = mempool.write().unwrap();
// Add new transactions
mempool.add(txs_to_add);

mempool
.count
.with_label_values(&["txs"])
.set(mempool.txstore.len() as f64);

// Update cached backlog stats (if expired)
if mempool.backlog_stats.1.elapsed() > Duration::from_secs(BACKLOG_STATS_TTL) {
mempool.update_backlog_stats();
}
}

Ok(())
}
}

#[derive(Serialize)]
Expand Down
7 changes: 3 additions & 4 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl TestRunner {
&metrics,
Arc::clone(&config),
)));
mempool.write().unwrap().update(&daemon)?;
Mempool::update(&mempool, &daemon)?;

let query = Arc::new(Query::new(
Arc::clone(&chain),
Expand Down Expand Up @@ -193,10 +193,9 @@ impl TestRunner {

pub fn sync(&mut self) -> Result<()> {
self.indexer.update(&self.daemon)?;
let mut mempool = self.mempool.write().unwrap();
mempool.update(&self.daemon)?;
Mempool::update(&self.mempool, &self.daemon)?;
// force an update for the mempool stats, which are normally cached
mempool.update_backlog_stats();
self.mempool.write().unwrap().update_backlog_stats();
Ok(())
}

Expand Down
Loading