Skip to content

Commit

Permalink
Root hash prefetcher + small things (#204)
Browse files Browse the repository at this point in the history
## 📝 Summary

1. Adds sparse trie prefetcher
2. Fixes deser. error for validation request
3. Adds root hash metrics because finalize adds up to 60% to the root
hash with sparse trie and its separates these two.

## 💡 Motivation and Context

<!--- (Optional) Why is this change required? What problem does it
solve? Remove this section if not applicable. -->

---

## ✅ I have completed the following steps:

* [ ] Run `make lint`
* [ ] Run `make test`
* [ ] Added tests (if applicable)
  • Loading branch information
dvush authored Oct 8, 2024
1 parent 6cf6b28 commit ad5117e
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ mockall = "0.12.1"
shellexpand = "3.1.0"
async-trait = "0.1.80"

eth-sparse-mpt = { git = "https://github.com/flashbots/eth-sparse-mpt", rev = "664759b" }
eth-sparse-mpt = { git = "https://github.com/flashbots/eth-sparse-mpt", rev = "5d0da73" }

[build-dependencies]
built = { version = "0.7.1", features = ["git2", "chrono"] }
Expand Down
1 change: 1 addition & 0 deletions crates/rbuilder/src/bin/dummy-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ async fn main() -> eyre::Result<()> {
extra_rpc: RpcModule::new(()),
sink_factory: Box::new(TraceBlockSinkFactory {}),
builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))],
run_sparse_trie_prefetcher: false,
};

let ctrlc = tokio::spawn(async move {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl<DB: Database + Clone + 'static> BlockBuildingHelperFromDB<DB> {
telemetry::add_built_block_metrics(
built_block_trace.fill_time,
built_block_trace.finalize_time,
built_block_trace.root_hash_time,
txs,
blobs,
gas_used,
Expand Down Expand Up @@ -363,6 +364,7 @@ impl<DB: Database + Clone + 'static> BlockBuildingHelper for BlockBuildingHelper
}
};
self.built_block_trace.update_orders_sealed_at();
self.built_block_trace.root_hash_time = finalized_block.root_hash_time;

self.built_block_trace.finalize_time = start_time.elapsed();

Expand Down
2 changes: 2 additions & 0 deletions crates/rbuilder/src/building/built_block_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct BuiltBlockTrace {
pub orders_sealed_at: OffsetDateTime,
pub fill_time: Duration,
pub finalize_time: Duration,
pub root_hash_time: Duration,
}

impl Default for BuiltBlockTrace {
Expand Down Expand Up @@ -55,6 +56,7 @@ impl BuiltBlockTrace {
orders_sealed_at: OffsetDateTime::now_utc(),
fill_time: Duration::from_secs(0),
finalize_time: Duration::from_secs(0),
root_hash_time: Duration::from_secs(0),
}
}

Expand Down
11 changes: 10 additions & 1 deletion crates/rbuilder/src/building/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ use revm::{
primitives::{BlobExcessGasAndPrice, BlockEnv, CfgEnvWithHandlerCfg, SpecId},
};
use serde::Deserialize;
use std::{hash::Hash, str::FromStr, sync::Arc};
use std::{
hash::Hash,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
use thiserror::Error;
use time::OffsetDateTime;

Expand Down Expand Up @@ -407,6 +412,7 @@ pub struct FinalizeResult {
pub cached_reads: CachedReads,
// sidecars for all txs in SealedBlock
pub txs_blob_sidecars: Vec<Arc<BlobTransactionSidecar>>,
pub root_hash_time: Duration,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -654,6 +660,7 @@ impl<Tracer: SimulationTracer> PartialBlock<Tracer> {
.block_logs_bloom(block_number)
.expect("Number is in range");

let start = Instant::now();
let state_root = calculate_state_root(
provider_factory,
ctx.attributes.parent,
Expand All @@ -662,6 +669,7 @@ impl<Tracer: SimulationTracer> PartialBlock<Tracer> {
ctx.shared_sparse_mpt_cache.clone(),
root_hash_config,
)?;
let root_hash_time = start.elapsed();

// create the block header
let transactions_root = proofs::calculate_transaction_root(&self.executed_tx);
Expand Down Expand Up @@ -735,6 +743,7 @@ impl<Tracer: SimulationTracer> PartialBlock<Tracer> {
sealed_block: block.seal_slow(),
cached_reads,
txs_blob_sidecars,
root_hash_time,
})
}

Expand Down
2 changes: 2 additions & 0 deletions crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ impl BaseConfig {
extra_rpc: RpcModule::new(()),
sink_factory,
builders: Vec::new(),

run_sparse_trie_prefetcher: self.root_hash_use_sparse_trie,
})
}

Expand Down
18 changes: 18 additions & 0 deletions crates/rbuilder/src/live_builder/building/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
BlockBuildingContext,
},
live_builder::{payload_events::MevBoostSlotData, simulation::SlotOrderSimResults},
roothash::run_trie_prefetcher,
utils::ProviderFactoryReopener,
};
use reth_db::database::Database;
Expand All @@ -30,6 +31,7 @@ pub struct BlockBuildingPool<DB> {
sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
orderpool_subscriber: order_input::OrderPoolSubscriber,
order_simulation_pool: OrderSimulationPool<DB>,
run_sparse_trie_prefetcher: bool,
}

impl<DB: Database + Clone + 'static> BlockBuildingPool<DB> {
Expand All @@ -39,13 +41,15 @@ impl<DB: Database + Clone + 'static> BlockBuildingPool<DB> {
sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
orderpool_subscriber: order_input::OrderPoolSubscriber,
order_simulation_pool: OrderSimulationPool<DB>,
run_sparse_trie_prefetcher: bool,
) -> Self {
BlockBuildingPool {
provider_factory,
builders,
sink_factory,
orderpool_subscriber,
order_simulation_pool,
run_sparse_trie_prefetcher,
}
}

Expand Down Expand Up @@ -127,6 +131,20 @@ impl<DB: Database + Clone + 'static> BlockBuildingPool<DB> {
});
}

if self.run_sparse_trie_prefetcher {
let input = broadcast_input.subscribe();
tokio::task::spawn_blocking(move || {
run_trie_prefetcher(
ctx.attributes.parent,
ctx.shared_sparse_mpt_cache,
provider_factory,
input,
cancel.clone(),
);
debug!(block = block_number, "Stopped trie prefetcher job");
});
}

tokio::spawn(multiplex_job(input.orders, broadcast_input));
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/rbuilder/src/live_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct LiveBuilder<DB, BlocksSourceType: SlotSource> {
pub simulation_threads: usize,
pub order_input_config: OrderInputConfig,
pub blocks_source: BlocksSourceType,
pub run_sparse_trie_prefetcher: bool,

pub chain_chain_spec: Arc<ChainSpec>,
pub provider_factory: ProviderFactoryReopener<DB>,
Expand Down Expand Up @@ -131,6 +132,7 @@ impl<DB: Database + Clone + 'static, BuilderSourceType: SlotSource>
self.sink_factory,
orderpool_subscriber,
order_simulation_pool,
self.run_sparse_trie_prefetcher,
);

let watchdog_sender = spawn_watchdog_thread(self.watchdog_timeout)?;
Expand Down
4 changes: 4 additions & 0 deletions crates/rbuilder/src/roothash/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod prefetcher;

use alloy_primitives::B256;
use eth_sparse_mpt::reth_sparse_trie::{
calculate_root_hash_with_sparse_trie, trie_fetcher::FetchNodeError, SparseTrieError,
Expand All @@ -12,6 +14,8 @@ use reth_errors::ProviderError;
use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError};
use tracing::trace;

pub use prefetcher::run_trie_prefetcher;

#[derive(Debug, Clone, Copy)]
pub enum RootHashMode {
/// Makes correct root hash calculation on the correct parent state.
Expand Down
158 changes: 158 additions & 0 deletions crates/rbuilder/src/roothash/prefetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::{iter, time::Instant};

use ahash::{HashMap, HashSet};
use alloy_primitives::{Address, B256};
use eth_sparse_mpt::{
prefetch_tries_for_accounts,
reth_sparse_trie::{trie_fetcher::FetchNodeError, SparseTrieError, SparseTrieSharedCache},
ChangedAccountData,
};
use reth::providers::{providers::ConsistentDbView, ProviderFactory};
use reth_db::database::Database;
use reth_errors::ProviderError;
use tokio::sync::broadcast::{self, error::RecvError, error::TryRecvError};
use tokio_util::sync::CancellationToken;
use tracing::{error, trace, warn};

use crate::{
building::evm_inspector::SlotKey, live_builder::simulation::SimulatedOrderCommand,
primitives::SimulatedOrder,
};

const CONSUME_SIM_ORDERS_BATCH: usize = 128;

/// Runs a process that prefetches pieces of the trie based on the slots used by the order in simulation
/// Its a blocking call so it should be spawned on the separate thread.
pub fn run_trie_prefetcher<DB: Database + Clone + 'static>(
parent_hash: B256,
shared_sparse_mpt_cache: SparseTrieSharedCache,
provider_factory: ProviderFactory<DB>,
mut simulated_orders: broadcast::Receiver<SimulatedOrderCommand>,
cancel: CancellationToken,
) {
let consistent_db_view = ConsistentDbView::new(provider_factory, Some(parent_hash));

// here we mark data that was fetched for this slot before
let mut fetched_accounts: HashSet<Address> = HashSet::default();
let mut fetched_slots: HashSet<SlotKey> = HashSet::default();

// loop local variables
let mut used_state_traces = Vec::new();
let mut fetch_request: HashMap<Address, ChangedAccountData> = HashMap::default();
loop {
used_state_traces.clear();
fetch_request.clear();

if cancel.is_cancelled() {
return;
}

for _ in 0..CONSUME_SIM_ORDERS_BATCH {
match simulated_orders.try_recv() {
Ok(SimulatedOrderCommand::Simulation(SimulatedOrder {
used_state_trace: Some(used_state_trace),
..
})) => {
used_state_traces.push(used_state_trace);
}
Ok(_) => continue,
Err(TryRecvError::Empty) => {
if !used_state_traces.is_empty() {
break;
}
// block so thread can sleep if there are no inputs
match simulated_orders.blocking_recv() {
Ok(SimulatedOrderCommand::Simulation(SimulatedOrder {
used_state_trace: Some(used_state_trace),
..
})) => {
used_state_traces.push(used_state_trace);
}
Ok(_) => continue,
Err(RecvError::Closed) => return,
Err(RecvError::Lagged(msg)) => {
warn!(
"State trie prefetching thread lagging on sim orders channel: {}",
msg
);
break;
}
}
}
Err(TryRecvError::Closed) => {
return;
}
Err(TryRecvError::Lagged(msg)) => {
warn!(
"State trie prefetching thread lagging on sim orders channel: {}",
msg
);
break;
}
};
}

for used_state_trace in used_state_traces.drain(..) {
let changed_accounts_iter = used_state_trace
.received_amount
.keys()
.chain(used_state_trace.sent_amount.keys())
.zip(iter::repeat(false))
.chain(
used_state_trace
.destructed_contracts
.iter()
.zip(iter::repeat(true)),
);

for (address, destroyed) in changed_accounts_iter {
if fetched_accounts.contains(address) {
continue;
}
fetched_accounts.insert(*address);
fetch_request
.entry(*address)
.or_insert_with(|| ChangedAccountData::new(*address, destroyed));
}

for (written_slot, value) in &used_state_trace.written_slot_values {
if fetched_slots.contains(written_slot) {
continue;
}
fetched_slots.insert(written_slot.clone());
let account_request = fetch_request
.entry(written_slot.address)
.or_insert_with(|| ChangedAccountData::new(written_slot.address, false));
account_request
.slots
.push((written_slot.key, value.is_zero()));
}
}

if fetch_request.is_empty() {
continue;
}

let start = Instant::now();
match prefetch_tries_for_accounts(
consistent_db_view.clone(),
shared_sparse_mpt_cache.clone(),
fetch_request.values(),
) {
Ok(()) => {}
Err(SparseTrieError::FetchNode(FetchNodeError::Provider(
ProviderError::ConsistentView(_),
))) => {
return;
}
Err(err) => {
error!(?err, "Error while prefetching trie nodes");
}
}
trace!(
time_ms = start.elapsed().as_millis(),
accounts = fetch_request.len(),
"Prefetched trie nodes"
);
}
}
10 changes: 10 additions & 0 deletions crates/rbuilder/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ register_metrics! {
&["builder_name"]
)
.unwrap();
pub static BLOCK_ROOT_HASH_TIME: HistogramVec = HistogramVec::new(
HistogramOpts::new("block_root_hash_time", "Block Root Hash Time (ms)")
.buckets(exponential_buckets_range(1.0, 2000.0, 100)),
&["builder_name"]
)
.unwrap();
pub static BLOCK_BUILT_TXS: HistogramVec = HistogramVec::new(
HistogramOpts::new("block_built_txs", "Transactions in the built block")
.buckets(linear_buckets_range(1.0, 1000.0, 100)),
Expand Down Expand Up @@ -270,6 +276,7 @@ pub fn set_ordepool_count(txs: usize, bundles: usize) {
pub fn add_built_block_metrics(
build_time: Duration,
finalize_time: Duration,
root_hash_time: Duration,
txs: usize,
blobs: usize,
gas_used: u64,
Expand All @@ -290,6 +297,9 @@ pub fn add_built_block_metrics(
BLOCK_FINALIZE_TIME
.with_label_values(&[builder_name])
.observe(finalize_time.as_millis() as f64);
BLOCK_ROOT_HASH_TIME
.with_label_values(&[builder_name])
.observe(root_hash_time.as_millis() as f64);
BLOCK_BUILT_TXS
.with_label_values(&[builder_name])
.observe(txs as f64);
Expand Down
Loading

0 comments on commit ad5117e

Please sign in to comment.