Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/improve fee history performance #5182

Merged
merged 43 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
d025d82
add structures which implements cache for base_fee_per_gas and used_r…
allnil Oct 23, 2023
a4b9195
prepare blueprint for cache integration for fees values
allnil Oct 24, 2023
558b3c4
add BlockFees type to collect fees data for particular block in cache…
allnil Oct 25, 2023
54204b4
format and clean
allnil Oct 25, 2023
2f40820
add comment
allnil Oct 25, 2023
acbbd76
leave comment regarding collected values
allnil Oct 25, 2023
43a4d4a
delete unused structs
allnil Oct 25, 2023
70dfb48
remove unused imports
allnil Oct 25, 2023
e690f42
remove fee history cache from eth_state, prepare separate type and ca…
allnil Oct 26, 2023
a9b0740
fix tests for single block
allnil Oct 27, 2023
dae2964
add parent_hash in mock data for tests
allnil Oct 27, 2023
52d4156
BTreeMap wrapped with RwLock, prepare task which listens to new canon…
allnil Oct 30, 2023
93ed179
add atomic bounds, refactor cache and provider
allnil Oct 31, 2023
1ae2aae
handle empty block intervals and fee history
allnil Nov 1, 2023
b3c2d8d
prepare ground for futher refactoring, remove provider from FeeHistor…
allnil Nov 3, 2023
e4ade22
refactor logic to use iterator over blocks in on_new_blocks function,…
allnil Nov 4, 2023
1d2f455
Merge branch 'main' of github.com:allnil/reth into feat/improve_fee_h…
allnil Nov 4, 2023
0cde2a1
merge
allnil Nov 4, 2023
912fd0f
prepare type to store fee data
allnil Nov 4, 2023
2805a7a
add annotation for clippy to ignore too many arguments on new fn
allnil Nov 4, 2023
30d0b4c
merge with main
allnil Nov 6, 2023
bbe4483
Merge branch 'main' of github.com:allnil/reth into feat/improve_fee_h…
allnil Nov 14, 2023
398e885
refactor, precalculate percentiles, rewards
allnil Nov 15, 2023
7d072b5
merge
allnil Nov 15, 2023
e213d13
add mock for block_range, update tests
allnil Nov 16, 2023
811a3c8
Merge branch 'main' of github.com:allnil/reth into feat/improve_fee_h…
allnil Nov 16, 2023
08a25fc
add simple test on rewards, calculate rewards on the fly
allnil Nov 16, 2023
ff3aed8
remove test because of transactions, for now
allnil Nov 16, 2023
1bc833a
merge with main
allnil Nov 16, 2023
2f61a8c
Update crates/rpc/rpc/src/eth/api/mod.rs
allnil Nov 20, 2023
66bb10d
merge and fmt
allnil Nov 20, 2023
30cd2ce
Merge branch 'feat/improve_fee_history_performance' of github.com:all…
allnil Nov 20, 2023
b131559
fix docs, add resolution in config, improve naming, move from blocks …
allnil Nov 20, 2023
b904b37
apply nightly rules
allnil Nov 20, 2023
3d4efe7
set proper and idiomatic semicolons
allnil Nov 20, 2023
8e9328c
move types adjacent to fee history to separate file
allnil Nov 22, 2023
9660e55
Merge branch 'main' into feat/improve_fee_history_performance
mattsse Nov 24, 2023
83f9c88
chore: touchups
mattsse Nov 24, 2023
3cbd463
refactor
mattsse Nov 24, 2023
7dc0e83
wip: caching
mattsse Nov 24, 2023
710540e
Merge branch 'main' into feat/improve_fee_history_performance
mattsse Nov 27, 2023
87bd429
cleanup
mattsse Nov 27, 2023
ef27a2f
cleanuo
mattsse Nov 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/net/downloaders/src/test_utils/bodies_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl BodiesClient for TestBodiesClient {

Box::pin(async move {
if should_respond_empty {
return Ok((PeerId::default(), vec![]).into());
return Ok((PeerId::default(), vec![]).into())
}

if should_delay {
Expand Down
2 changes: 1 addition & 1 deletion crates/net/eth-wire/src/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl SharedCapability {
/// Returns an error if the offset is equal or less than [`MAX_RESERVED_MESSAGE_ID`].
pub(crate) fn new(name: &str, version: u8, offset: u8) -> Result<Self, SharedCapabilityError> {
if offset <= MAX_RESERVED_MESSAGE_ID {
return Err(SharedCapabilityError::ReservedMessageIdOffset(offset));
return Err(SharedCapabilityError::ReservedMessageIdOffset(offset))
}

match name {
Expand Down
4 changes: 2 additions & 2 deletions crates/primitives/src/snapshot/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ impl SnapshotSegment {
) -> Option<(Self, RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)> {
let mut parts = name.to_str()?.split('_');
if parts.next() != Some("snapshot") {
return None;
return None
}

let segment = Self::from_str(parts.next()?).ok()?;
let (block_start, block_end) = (parts.next()?.parse().ok()?, parts.next()?.parse().ok()?);
let (tx_start, tx_end) = (parts.next()?.parse().ok()?, parts.next()?.parse().ok()?);

if block_start >= block_end || tx_start > tx_end {
return None;
return None
}

Some((segment, block_start..=block_end, tx_start..=tx_end))
Expand Down
10 changes: 9 additions & 1 deletion crates/rpc/rpc-builder/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use reth_provider::{
StateProviderFactory,
};
use reth_rpc::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig},
eth::{
cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache,
FeeHistoryCacheConfig,
},
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter,
EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret,
};
Expand Down Expand Up @@ -57,7 +60,11 @@ where
// spawn a new cache task
let eth_cache =
EthStateCache::spawn_with(provider.clone(), Default::default(), executor.clone());

let gas_oracle = GasPriceOracle::new(provider.clone(), Default::default(), eth_cache.clone());

let fee_history_cache =
FeeHistoryCache::new(eth_cache.clone(), FeeHistoryCacheConfig::default());
let eth_api = EthApi::with_spawner(
provider.clone(),
pool.clone(),
Expand All @@ -67,6 +74,7 @@ where
EthConfig::default().rpc_gas_cap,
Box::new(executor.clone()),
BlockingTaskPool::build().expect("failed to build tracing pool"),
fee_history_cache,
);
let config = EthFilterConfig::default()
.max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE)
Expand Down
5 changes: 4 additions & 1 deletion crates/rpc/rpc-builder/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use reth_rpc::{
eth::{
cache::{EthStateCache, EthStateCacheConfig},
gas_oracle::GasPriceOracleConfig,
EthFilterConfig, RPC_DEFAULT_GAS_CAP,
EthFilterConfig, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP,
},
BlockingTaskPool, EthApi, EthFilter, EthPubSub,
};
Expand Down Expand Up @@ -46,6 +46,8 @@ pub struct EthConfig {
///
/// Sets TTL for stale filters
pub stale_filter_ttl: std::time::Duration,
/// Settings for the fee history cache
pub fee_history_cache: FeeHistoryCacheConfig,
}

impl EthConfig {
Expand All @@ -71,6 +73,7 @@ impl Default for EthConfig {
max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE,
rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(),
stale_filter_ttl: DEFAULT_STALE_FILTER_TTL,
fee_history_cache: FeeHistoryCacheConfig::default(),
}
}
}
Expand Down
18 changes: 17 additions & 1 deletion crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ use reth_provider::{
use reth_rpc::{
eth::{
cache::{cache_new_blocks_task, EthStateCache},
fee_history_cache_new_blocks_task,
gas_oracle::GasPriceOracle,
EthBundle,
EthBundle, FeeHistoryCache,
},
AdminApi, AuthLayer, BlockingTaskGuard, BlockingTaskPool, Claims, DebugApi, EngineEthApi,
EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, NetApi,
Expand Down Expand Up @@ -1107,13 +1108,27 @@ where
);
let new_canonical_blocks = self.events.canonical_state_stream();
let c = cache.clone();

self.executor.spawn_critical(
"cache canonical blocks task",
Box::pin(async move {
cache_new_blocks_task(c, new_canonical_blocks).await;
}),
);

let fee_history_cache =
FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone());
let new_canonical_blocks = self.events.canonical_state_stream();
let fhc = fee_history_cache.clone();
let provider_clone = self.provider.clone();
self.executor.spawn_critical(
"cache canonical blocks for fee history task",
Box::pin(async move {
fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone)
.await;
}),
);

let executor = Box::new(self.executor.clone());
let blocking_task_pool =
BlockingTaskPool::build().expect("failed to build tracing pool");
Expand All @@ -1126,6 +1141,7 @@ where
self.config.eth.rpc_gas_cap,
executor.clone(),
blocking_task_pool.clone(),
fee_history_cache,
);
let filter = EthFilter::new(
self.provider.clone(),
Expand Down
4 changes: 2 additions & 2 deletions crates/rpc/rpc-types/src/serde_helpers/json_u256.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ where
} else {
// We could try to convert to a u128 here but there would probably be loss of
// precision, so we just return an error.
return Err(Error::custom("Deserializing a large non-mainnet TTD is not supported"));
return Err(Error::custom("Deserializing a large non-mainnet TTD is not supported"))
}
} else {
// must be i64 - negative numbers are not supported
return Err(Error::custom("Negative TTD values are invalid and will not be deserialized"));
return Err(Error::custom("Negative TTD values are invalid and will not be deserialized"))
};

Ok(num)
Expand Down
140 changes: 55 additions & 85 deletions crates/rpc/rpc/src/eth/api/fees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ use crate::{
eth::error::{EthApiError, EthResult},
EthApi,
};

use reth_network_api::NetworkInfo;
use reth_primitives::{
basefee::calculate_next_block_base_fee, BlockNumberOrTag, SealedHeader, U256,
};
use reth_primitives::{basefee::calculate_next_block_base_fee, BlockNumberOrTag, U256};
use reth_provider::{BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderFactory};
use reth_rpc_types::{FeeHistory, TxGasAndReward};
use reth_rpc_types::FeeHistory;
use reth_transaction_pool::TransactionPool;

use tracing::debug;

use super::FeeHistoryEntry;

impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Pool: TransactionPool + Clone + 'static,
Expand Down Expand Up @@ -85,9 +86,9 @@ where
block_count = end_block_plus;
}

// If reward percentiles were specified, we need to validate that they are monotonically
// If reward percentiles were specified, we
// need to validate that they are monotonically
// increasing and 0 <= p <= 100
//
// Note: The types used ensure that the percentiles are never < 0
if let Some(percentiles) = &reward_percentiles {
if percentiles.windows(2).any(|w| w[0] > w[1] || w[0] > 100.) {
Expand All @@ -100,110 +101,79 @@ where
// Treat a request for 1 block as a request for `newest_block..=newest_block`,
// otherwise `newest_block - 2
// SAFETY: We ensured that block count is capped

let start_block = end_block_plus - block_count;
let headers = self.provider().sealed_headers_range(start_block..=end_block)?;
if headers.len() != block_count as usize {
return Err(EthApiError::InvalidBlockRange)

let mut fee_entries = self.fee_history_cache().get_history(start_block, end_block).await?;

let mut fee_cache_flag = true;
if fee_entries.is_empty() {
for block in self.provider().block_range(start_block..=end_block)?.into_iter() {
let entry = FeeHistoryEntry::new(&block.seal_slow());
fee_entries.push(entry);
}
fee_cache_flag = false;
}

if fee_entries.len() != block_count as usize {
return Err(EthApiError::InvalidBlockRange)
}
// Collect base fees, gas usage ratios and (optionally) reward percentile data
let mut base_fee_per_gas: Vec<U256> = Vec::new();
let mut gas_used_ratio: Vec<f64> = Vec::new();
let mut rewards: Vec<Vec<U256>> = Vec::new();
for header in &headers {
base_fee_per_gas
.push(U256::try_from(header.base_fee_per_gas.unwrap_or_default()).unwrap());
gas_used_ratio.push(header.gas_used as f64 / header.gas_limit as f64);

// Percentiles were specified, so we need to collect reward percentile ino
if let Some(percentiles) = &reward_percentiles {
rewards.push(self.calculate_reward_percentiles(percentiles, header).await?);

for entry in &fee_entries {
base_fee_per_gas.push(U256::try_from(entry.base_fee_per_gas).unwrap());
gas_used_ratio.push(entry.gas_used_ratio);

if fee_cache_flag == true {
if let Some(percentiles) = &reward_percentiles {
let mut block_rewards = Vec::new();

for &percentile in percentiles.iter() {
block_rewards.push(self.approximate_percentile(entry, percentile));
}
rewards.push(block_rewards);
}
}
}

// The spec states that `base_fee_per_gas` "[..] includes the next block after the newest of
// the returned range, because this value can be derived from the newest block"
//
// The unwrap is safe since we checked earlier that we got at least 1 header.
let last_header = headers.last().unwrap();

let last_entry = fee_entries.last().unwrap();

let chain_spec = self.provider().chain_spec();

base_fee_per_gas.push(U256::from(calculate_next_block_base_fee(
last_header.gas_used,
last_header.gas_limit,
last_header.base_fee_per_gas.unwrap_or_default(),
last_entry.gas_used,
last_entry.gas_limit,
last_entry.base_fee_per_gas,
chain_spec.base_fee_params,
)));

Ok(FeeHistory {
base_fee_per_gas,
gas_used_ratio,
oldest_block: U256::from(start_block),
reward: reward_percentiles.map(|_| rewards),
reward: if rewards.is_empty() { None } else { Some(rewards) },
})
}

/// Calculates reward percentiles for transactions in a block header.
/// Given a list of percentiles and a sealed block header, this function computes
/// the corresponding rewards for the transactions at each percentile.
///
/// The results are returned as a vector of U256 values.
async fn calculate_reward_percentiles(
&self,
percentiles: &[f64],
header: &SealedHeader,
) -> Result<Vec<U256>, EthApiError> {
let (transactions, receipts) = self
.cache()
.get_transactions_and_receipts(header.hash)
.await?
.ok_or(EthApiError::InvalidBlockRange)?;

let mut transactions = transactions
.into_iter()
.zip(receipts)
.scan(0, |previous_gas, (tx, receipt)| {
// Convert the cumulative gas used in the receipts
// to the gas usage by the transaction
//
// While we will sum up the gas again later, it is worth
// noting that the order of the transactions will be different,
// so the sum will also be different for each receipt.
let gas_used = receipt.cumulative_gas_used - *previous_gas;
*previous_gas = receipt.cumulative_gas_used;

Some(TxGasAndReward {
gas_used,
reward: tx.effective_tip_per_gas(header.base_fee_per_gas).unwrap_or_default(),
})
})
.collect::<Vec<_>>();

// Sort the transactions by their rewards in ascending order
transactions.sort_by_key(|tx| tx.reward);

// Find the transaction that corresponds to the given percentile
//
// We use a `tx_index` here that is shared across all percentiles, since we know
// the percentiles are monotonically increasing.
let mut tx_index = 0;
let mut cumulative_gas_used =
transactions.first().map(|tx| tx.gas_used).unwrap_or_default();
let mut rewards_in_block = Vec::new();
for percentile in percentiles {
// Empty blocks should return in a zero row
if transactions.is_empty() {
rewards_in_block.push(U256::ZERO);
continue
}

let threshold = (header.gas_used as f64 * percentile / 100.) as u64;
while cumulative_gas_used < threshold && tx_index < transactions.len() - 1 {
tx_index += 1;
cumulative_gas_used += transactions[tx_index].gas_used;
}
rewards_in_block.push(U256::from(transactions[tx_index].reward));
}

Ok(rewards_in_block)
/// Approximates reward at a given percentile for a specific block
/// Based on the configured resolution
fn approximate_percentile(&self, entry: &FeeHistoryEntry, requested_percentile: f64) -> U256 {
let resolution = self.fee_history_cache().config.resolution;
let rounded_percentile =
(requested_percentile * resolution as f64).round() / resolution as f64;
let clamped_percentile = rounded_percentile.clamp(0.0, 100.0);

// Calculate the index in the precomputed rewards array
let index = (clamped_percentile / (1.0 / resolution as f64)).round() as usize;
// Fetch the reward from the FeeHistoryEntry
entry.rewards.get(index).cloned().unwrap_or(U256::ZERO)
}
}
Loading
Loading