From 29b53b162c051a1702c8048e7ed4c14201a2d0ff Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Fri, 25 Oct 2024 12:14:23 -0400 Subject: [PATCH] perf: introduce moka cached state provider perf: add builders --- Cargo.lock | 33 +++ Cargo.toml | 3 +- crates/engine/tree/Cargo.toml | 1 + crates/engine/tree/src/tree/cached_state.rs | 277 ++++++++++++++++++++ crates/engine/tree/src/tree/mod.rs | 1 + 5 files changed, 314 insertions(+), 1 deletion(-) create mode 100644 crates/engine/tree/src/tree/cached_state.rs diff --git a/Cargo.lock b/Cargo.lock index 27581ba3a800..52cc6e80ad85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4983,6 +4983,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "moka" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "once_cell", + "parking_lot", + "quanta", + "rustc_version 0.4.1", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "more-asserts" version = "0.3.1" @@ -7176,6 +7196,7 @@ dependencies = [ "assert_matches", "futures", "metrics", + "moka", "reth-beacon-consensus", "reth-blockchain-tree", "reth-blockchain-tree-api", @@ -10403,6 +10424,12 @@ dependencies = [ "windows 0.57.0", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -11000,6 +11027,12 @@ dependencies = [ "rlp", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "trust-dns-proto" version = "0.23.2" diff --git a/Cargo.toml b/Cargo.toml index 8c5cf7b0cd5a..894590f3caf0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -525,6 +525,7 @@ tracing = "0.1.0" tracing-appender = "0.2" url = "2.3" zstd = "0.13" +moka = "0.12" # metrics metrics = "0.24.0" @@ -633,4 +634,4 @@ tracy-client = "0.17.3" #op-alloy-rpc-types = { git = "https://github.com/alloy-rs/op-alloy", rev = "6a042e7681b1" } #op-alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/op-alloy", rev = "6a042e7681b1" } #op-alloy-network = { git = "https://github.com/alloy-rs/op-alloy", rev = "6a042e7681b1" } -#op-alloy-consensus = { git = "https://github.com/alloy-rs/op-alloy", rev = "6a042e7681b1" } \ No newline at end of file +#op-alloy-consensus = { git = "https://github.com/alloy-rs/op-alloy", rev = "6a042e7681b1" } diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index dee0bcaf7ce1..2aa6a5ce3a95 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -43,6 +43,7 @@ alloy-rpc-types-engine.workspace = true futures.workspace = true tokio = { workspace = true, features = ["macros", "sync"] } thiserror.workspace = true +moka = { workspace = true, features = ["sync"] } # metrics metrics.workspace = true diff --git a/crates/engine/tree/src/tree/cached_state.rs b/crates/engine/tree/src/tree/cached_state.rs new file mode 100644 index 000000000000..900ed9cddbe1 --- /dev/null +++ b/crates/engine/tree/src/tree/cached_state.rs @@ -0,0 +1,277 @@ +//! Implements a state provider that has a shared cache in front of it. +#![allow(dead_code)] +use alloy_primitives::{Address, StorageKey, StorageValue, B256}; +use metrics::Gauge; +use moka::sync::Cache; +use reth_errors::ProviderResult; +use reth_metrics::Metrics; +use reth_primitives::{Account, Bytecode}; +use reth_provider::{ + AccountReader, BlockHashReader, StateProofProvider, StateProvider, StateRootProvider, + StorageRootProvider, +}; + +/// A wrapper of a state provider and a shared cache. +pub(crate) struct CachedStateProvider { + /// The state provider + state_provider: S, + + /// The caches used for the provider + caches: ProviderCaches, + + /// Metrics for the cached state provider + metrics: CachedStateMetrics, +} + +impl CachedStateProvider +where + S: StateProvider, +{ + /// Creates a new [`CachedStateProvider`] that contains the given state provider and caches. + pub(crate) const fn new( + state_provider: S, + code_cache: Cache>, + storage_cache: Cache<(Address, StorageKey), Option>, + account_cache: Cache>, + metrics: CachedStateMetrics, + ) -> Self { + let caches = ProviderCaches { code_cache, account_cache, storage_cache }; + + Self { state_provider, caches, metrics } + } + + /// Creates a new [`CachedStateProvider`] from a [`ProviderCaches`], state provider, and + /// [`CachedStateMetrics`]. + pub(crate) const fn new_with_caches( + state_provider: S, + caches: ProviderCaches, + metrics: CachedStateMetrics, + ) -> Self { + Self { state_provider, caches, metrics } + } +} + +/// Metrics for the cached state provider, showing hits / misses for each cache +#[derive(Metrics, Clone)] +#[metrics(scope = "sync.caching")] +pub(crate) struct CachedStateMetrics { + /// Code cache hits + code_cache_hits: Gauge, + + /// Code cache misses + code_cache_misses: Gauge, + + /// Storage cache hits + storage_cache_hits: Gauge, + + /// Storage cache misses + storage_cache_misses: Gauge, + + /// Account cache hits + account_cache_hits: Gauge, + + /// Account cache misses + account_cache_misses: Gauge, +} + +impl CachedStateMetrics { + /// Sets all values to zero, indicating that a new block is being executed. + pub(crate) fn reset(&self) { + // code cache + self.code_cache_hits.set(0); + self.code_cache_misses.set(0); + + // storage cache + self.storage_cache_hits.set(0); + self.storage_cache_misses.set(0); + + // account cache + self.account_cache_hits.set(0); + self.account_cache_misses.set(0); + } + + /// Returns a new zeroed-out instance of [`CachedStateMetrics`]. + pub(crate) fn zeroed() -> Self { + let zeroed = Self::default(); + zeroed.reset(); + zeroed + } +} + +impl AccountReader for CachedStateProvider { + fn basic_account(&self, address: Address) -> ProviderResult> { + if let Some(res) = self.caches.account_cache.get(&address) { + self.metrics.account_cache_hits.increment(1); + return Ok(res) + } + + self.metrics.account_cache_misses.increment(1); + + let res = self.state_provider.basic_account(address)?; + self.caches.account_cache.insert(address, res); + Ok(res) + } +} + +impl StateProvider for CachedStateProvider { + fn storage( + &self, + account: Address, + storage_key: StorageKey, + ) -> ProviderResult> { + if let Some(res) = self.caches.storage_cache.get(&(account, storage_key)) { + self.metrics.storage_cache_hits.increment(1); + return Ok(res) + } + + self.metrics.storage_cache_misses.increment(1); + + let final_res = self.state_provider.storage(account, storage_key)?; + self.caches.storage_cache.insert((account, storage_key), final_res); + Ok(final_res) + } + + fn bytecode_by_hash(&self, code_hash: B256) -> ProviderResult> { + if let Some(res) = self.caches.code_cache.get(&code_hash) { + self.metrics.code_cache_hits.increment(1); + return Ok(res) + } + + self.metrics.code_cache_misses.increment(1); + + let final_res = self.state_provider.bytecode_by_hash(code_hash)?; + self.caches.code_cache.insert(code_hash, final_res.clone()); + Ok(final_res) + } +} + +impl StateRootProvider for CachedStateProvider { + fn state_root(&self, hashed_state: reth_trie::HashedPostState) -> ProviderResult { + self.state_provider.state_root(hashed_state) + } + + fn state_root_from_nodes(&self, input: reth_trie::TrieInput) -> ProviderResult { + self.state_provider.state_root_from_nodes(input) + } + + fn state_root_from_nodes_with_updates( + &self, + input: reth_trie::TrieInput, + ) -> ProviderResult<(B256, reth_trie::updates::TrieUpdates)> { + self.state_provider.state_root_from_nodes_with_updates(input) + } + + fn state_root_with_updates( + &self, + hashed_state: reth_trie::HashedPostState, + ) -> ProviderResult<(B256, reth_trie::updates::TrieUpdates)> { + self.state_provider.state_root_with_updates(hashed_state) + } +} + +impl StateProofProvider for CachedStateProvider { + fn proof( + &self, + input: reth_trie::TrieInput, + address: Address, + slots: &[B256], + ) -> ProviderResult { + self.state_provider.proof(input, address, slots) + } + + fn multiproof( + &self, + input: reth_trie::TrieInput, + targets: alloy_primitives::map::HashMap>, + ) -> ProviderResult { + self.state_provider.multiproof(input, targets) + } + + fn witness( + &self, + input: reth_trie::TrieInput, + target: reth_trie::HashedPostState, + ) -> ProviderResult> { + self.state_provider.witness(input, target) + } +} + +impl StorageRootProvider for CachedStateProvider { + fn storage_root( + &self, + address: Address, + hashed_storage: reth_trie::HashedStorage, + ) -> ProviderResult { + self.state_provider.storage_root(address, hashed_storage) + } + + fn storage_proof( + &self, + address: Address, + slot: B256, + hashed_storage: reth_trie::HashedStorage, + ) -> ProviderResult { + self.state_provider.storage_proof(address, slot, hashed_storage) + } +} + +impl BlockHashReader for CachedStateProvider { + fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult> { + self.state_provider.block_hash(number) + } + + fn canonical_hashes_range( + &self, + start: alloy_primitives::BlockNumber, + end: alloy_primitives::BlockNumber, + ) -> ProviderResult> { + self.state_provider.canonical_hashes_range(start, end) + } +} + +/// The set of caches that are used in the [`CachedStateProvider`]. +#[derive(Debug, Clone)] +pub(crate) struct ProviderCaches { + /// The cache for bytecode + code_cache: Cache>, + + /// The cache for storage + storage_cache: Cache<(Address, StorageKey), Option>, + + /// The cache for basic accounts + account_cache: Cache>, +} + +/// A builder for [`ProviderCaches`]. +#[derive(Debug)] +pub(crate) struct ProviderCacheBuilder { + /// Code cache size + code_cache_size: u64, + + /// Storage cache size + storage_cache_size: u64, + + /// Account cache size + account_cache_size: u64, +} + +impl ProviderCacheBuilder { + /// Build a [`ProviderCaches`] struct, so that provider caches can be easily cloned. + pub(crate) fn build_caches(self) -> ProviderCaches { + ProviderCaches { + code_cache: Cache::new(self.code_cache_size), + storage_cache: Cache::new(self.storage_cache_size), + account_cache: Cache::new(self.account_cache_size), + } + } +} + +impl Default for ProviderCacheBuilder { + fn default() -> Self { + // moka caches have been benchmarked up to 800k entries, so we just use 1M, optimizing for + // hitrate over memory consumption. + // + // See: https://github.com/moka-rs/moka/wiki#admission-and-eviction-policies + Self { code_cache_size: 1000000, storage_cache_size: 1000000, account_cache_size: 1000000 } + } +} diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index bc070d87345e..48190d984da8 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -62,6 +62,7 @@ use tokio::sync::{ }; use tracing::*; +mod cached_state; pub mod config; mod invalid_block_hook; mod metrics;