diff --git a/Cargo.lock b/Cargo.lock index 15338145..cd7ef9d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,7 +346,7 @@ dependencies = [ "async-stream", "async-trait", "auto_impl", - "dashmap 6.0.1", + "dashmap 6.1.0", "futures", "futures-utils-wasm", "lru", @@ -2035,6 +2035,19 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -2308,9 +2321,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.0.1" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", @@ -4827,7 +4840,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -5663,7 +5676,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 1.1.3", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.72", @@ -7022,9 +7035,11 @@ dependencies = [ "built", "clap", "criterion", + "crossbeam", "crossbeam-queue", "csv", "ctor", + "dashmap 6.1.0", "derivative", "eth-sparse-mpt", "ethereum-consensus", @@ -7049,6 +7064,7 @@ dependencies = [ "mev-share-sse", "mockall", "once_cell", + "parking_lot 0.12.3", "primitive-types", "priority-queue", "prometheus", @@ -8332,7 +8348,7 @@ source = "git+https://github.com/paradigmxyz/reth?tag=v1.0.6#c228fe15808c3acbf18 dependencies = [ "bitflags 2.6.0", "byteorder", - "dashmap 6.0.1", + "dashmap 6.1.0", "derive_more 1.0.0", "indexmap 2.2.6", "parking_lot 0.12.3", @@ -8891,7 +8907,7 @@ source = "git+https://github.com/paradigmxyz/reth?tag=v1.0.6#c228fe15808c3acbf18 dependencies = [ "alloy-rpc-types-engine", "auto_impl", - "dashmap 6.0.1", + "dashmap 6.1.0", "itertools 0.13.0", "metrics", "once_cell", diff --git a/config-backtest-example.toml b/config-backtest-example.toml index 1fa7f6fd..987abf7d 100644 --- a/config-backtest-example.toml +++ b/config-backtest-example.toml @@ -29,8 +29,7 @@ failed_order_retries = 1 drop_failed_orders = true [[builders]] -name = "merging" -algo = "merging-builder" +name = "parallel" +algo = "parallel-builder" discard_txs = true -num_threads = 5 -merge_wait_time_ms = 100 \ No newline at end of file +num_threads = 25 \ No newline at end of file diff --git a/config-live-example.toml b/config-live-example.toml index c06ec060..b49a4661 100644 --- a/config-live-example.toml +++ b/config-live-example.toml @@ -58,8 +58,7 @@ failed_order_retries = 1 drop_failed_orders = true [[builders]] -name = "merging" -algo = "merging-builder" +name = "parallel" +algo = "parallel-builder" discard_txs = true -num_threads = 5 -merge_wait_time_ms = 100 \ No newline at end of file +num_threads = 25 \ No newline at end of file diff --git a/crates/rbuilder/Cargo.toml b/crates/rbuilder/Cargo.toml index 4d12d3d2..bc4bd979 100644 --- a/crates/rbuilder/Cargo.toml +++ b/crates/rbuilder/Cargo.toml @@ -125,6 +125,9 @@ shellexpand = "3.1.0" async-trait = "0.1.80" eth-sparse-mpt = { git = "https://github.com/flashbots/eth-sparse-mpt", rev = "5d0da73" } +crossbeam = "0.8.4" +parking_lot = "0.12.3" +dashmap = "6.1.0" [build-dependencies] built = { version = "0.7.1", features = ["git2", "chrono"] } diff --git a/crates/rbuilder/src/building/builders/merging_builder/combinator.rs b/crates/rbuilder/src/building/builders/merging_builder/combinator.rs deleted file mode 100644 index d2238bc4..00000000 --- a/crates/rbuilder/src/building/builders/merging_builder/combinator.rs +++ /dev/null @@ -1,174 +0,0 @@ -use crate::{ - building::{ - builders::block_building_helper::{BlockBuildingHelper, BlockBuildingHelperFromProvider}, - BlockBuildingContext, - }, - roothash::RootHashConfig, -}; - -use reth::tasks::pool::BlockingTaskPool; -use reth_db::Database; -use reth_payload_builder::database::CachedReads; -use reth_provider::{DatabaseProviderFactory, StateProviderFactory}; -use std::{marker::PhantomData, time::Instant}; -use time::OffsetDateTime; -use tokio_util::sync::CancellationToken; -use tracing::{info_span, trace}; - -use super::{GroupOrdering, OrderGroup}; - -/// CombinatorContext is used for merging the best ordering from groups into final block. -#[derive(Debug)] -pub struct CombinatorContext { - provider: P, - root_hash_task_pool: BlockingTaskPool, - ctx: BlockBuildingContext, - groups: Vec, - cancellation_token: CancellationToken, - cached_reads: Option, - discard_txs: bool, - coinbase_payment: bool, - root_hash_config: RootHashConfig, - builder_name: String, - phantom: PhantomData, -} - -impl CombinatorContext -where - DB: Database + Clone + 'static, - P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, -{ - #[allow(clippy::too_many_arguments)] - pub fn new( - provider: P, - root_hash_task_pool: BlockingTaskPool, - ctx: BlockBuildingContext, - groups: Vec, - cancellation_token: CancellationToken, - cached_reads: Option, - discard_txs: bool, - coinbase_payment: bool, - root_hash_config: RootHashConfig, - builder_name: String, - ) -> Self { - CombinatorContext { - provider, - root_hash_task_pool, - ctx, - groups, - cancellation_token, - cached_reads, - discard_txs, - coinbase_payment, - root_hash_config, - builder_name, - phantom: PhantomData, - } - } - - pub fn set_groups(&mut self, groups: Vec) { - self.groups = groups; - } - - /// Checks for simulated bundles that generated kickbacks. - /// orderings MUST be the same size as self.groups - fn contains_kickbacks(&self, orderings: &[GroupOrdering]) -> bool { - orderings.iter().enumerate().any(|(group_idx, ordering)| { - ordering.orders.iter().any(|(order_idx, _)| { - !self.groups[group_idx].orders[*order_idx] - .sim_value - .paid_kickbacks - .is_empty() - }) - }) - } - - pub fn combine_best_groups_mergings( - &mut self, - orders_closed_at: OffsetDateTime, - can_use_suggested_fee_recipient_as_coinbase: bool, - ) -> eyre::Result> { - let build_attempt_id: u32 = rand::random(); - let span = info_span!("build_run", build_attempt_id); - let _guard = span.enter(); - let build_start = Instant::now(); - - let mut best_orderings: Vec = self - .groups - .iter() - .map(|g| g.best_ordering.lock().unwrap().clone()) - .collect(); - - let use_suggested_fee_recipient_as_coinbase = self.coinbase_payment - && !self.contains_kickbacks(&best_orderings) - && can_use_suggested_fee_recipient_as_coinbase; - // Create a new ctx to remove builder_signer if necessary - let mut ctx = self.ctx.clone(); - if use_suggested_fee_recipient_as_coinbase { - ctx.modify_use_suggested_fee_recipient_as_coinbase(); - } - - let mut block_building_helper = BlockBuildingHelperFromProvider::new( - self.provider.clone(), - self.root_hash_task_pool.clone(), - self.root_hash_config.clone(), - ctx, - // @Perf cached reads / cursor caches - None, - self.builder_name.clone(), - self.discard_txs, - None, - self.cancellation_token.clone(), - )?; - block_building_helper.set_trace_orders_closed_at(orders_closed_at); - // loop until we insert all orders into the block - loop { - if self.cancellation_token.is_cancelled() { - break; - } - - let sim_order = if let Some((group_idx, order_idx, _)) = best_orderings - .iter() - .enumerate() - .filter_map(|(group_idx, ordering)| { - ordering - .orders - .first() - .map(|(order_idx, order_profit)| (group_idx, *order_idx, *order_profit)) - }) - .max_by_key(|(_, _, p)| *p) - { - best_orderings[group_idx].orders.remove(0); - &self.groups[group_idx].orders[order_idx] - } else { - // no order left in the groups - break; - }; - - let start_time = Instant::now(); - let commit_result = block_building_helper.commit_order(sim_order)?; - let order_commit_time = start_time.elapsed(); - - let mut gas_used = 0; - let mut execution_error = None; - let success = commit_result.is_ok(); - match commit_result { - Ok(res) => { - gas_used = res.gas_used; - } - Err(err) => execution_error = Some(err), - } - trace!( - order_id = ?sim_order.id(), - success, - order_commit_time_mus = order_commit_time.as_micros(), - gas_used, - ?execution_error, - "Executed order" - ); - } - block_building_helper.set_trace_fill_time(build_start.elapsed()); - self.cached_reads = Some(block_building_helper.clone_cached_reads()); - Ok(Box::new(block_building_helper)) - } -} diff --git a/crates/rbuilder/src/building/builders/merging_builder/merger.rs b/crates/rbuilder/src/building/builders/merging_builder/merger.rs deleted file mode 100644 index ab164a5d..00000000 --- a/crates/rbuilder/src/building/builders/merging_builder/merger.rs +++ /dev/null @@ -1,215 +0,0 @@ -use crate::building::{BlockBuildingContext, BlockState, PartialBlock}; -use ahash::HashMap; -use alloy_primitives::{Address, U256}; -use itertools::Itertools; -use rand::{seq::SliceRandom, SeedableRng}; -use reth::providers::StateProvider; -use reth_payload_builder::database::CachedReads; -use reth_provider::StateProviderFactory; -use std::sync::Arc; -use tokio_util::sync::CancellationToken; - -use super::OrderGroup; - -/// MergeTask describes some ordering that should be tried for the given group. -#[derive(Debug, Clone)] -pub struct MergeTask { - pub group_idx: usize, - pub command: MergeTaskCommand, -} - -#[derive(Debug, Clone)] -pub enum MergeTaskCommand { - /// `StaticOrdering` checks the following ordrerings: map profit first / last, mev gas price first / last - StaticOrdering { - /// if false reverse gas price and reverse profit orderings are skipped - extra_orderings: bool, - }, - /// `AllPermutations` checks all possible permutations of the group. - AllPermutations, - /// `RandomPermutations` checks random permutations of the group. - RandomPermutations { seed: u64, count: usize }, -} - -#[derive(Debug)] -pub struct MergingContext

-where - P: StateProviderFactory, -{ - pub provider: P, - pub ctx: BlockBuildingContext, - pub groups: Vec, - pub cancellation_token: CancellationToken, - pub cache: Option, -} - -impl

MergingContext

-where - P: StateProviderFactory, -{ - pub fn new( - provider: P, - ctx: BlockBuildingContext, - groups: Vec, - cancellation_token: CancellationToken, - cache: Option, - ) -> Self { - MergingContext { - provider, - ctx, - groups, - cancellation_token, - cache, - } - } - - pub fn run_merging_task(&mut self, task: MergeTask) -> eyre::Result<()> { - let state_provider = self - .provider - .history_by_block_hash(self.ctx.attributes.parent)?; - let state_provider: Arc = Arc::from(state_provider); - - let orderings_to_try = match task.command { - MergeTaskCommand::StaticOrdering { extra_orderings } => { - // order by / reverse gas price - // order by / reverse max profit - - let mut orderings = vec![]; - let orders = &self.groups[task.group_idx]; - { - let mut ids_and_value = orders - .orders - .iter() - .enumerate() - .map(|(idx, o)| (idx, o.sim_value.mev_gas_price)) - .collect::>(); - - if extra_orderings { - ids_and_value.sort_by(|a, b| a.1.cmp(&b.1)); - orderings.push( - ids_and_value - .iter() - .map(|(idx, _)| *idx) - .collect::>(), - ); - } - ids_and_value.sort_by(|a, b| a.1.cmp(&b.1).reverse()); - orderings.push( - ids_and_value - .iter() - .map(|(idx, _)| *idx) - .collect::>(), - ); - - let mut ids_and_value = orders - .orders - .iter() - .enumerate() - .map(|(idx, o)| (idx, o.sim_value.coinbase_profit)) - .collect::>(); - - if extra_orderings { - ids_and_value.sort_by(|a, b| a.1.cmp(&b.1)); - orderings.push( - ids_and_value - .iter() - .map(|(idx, _)| *idx) - .collect::>(), - ); - } - - ids_and_value.sort_by(|a, b| a.1.cmp(&b.1).reverse()); - orderings.push( - ids_and_value - .iter() - .map(|(idx, _)| *idx) - .collect::>(), - ); - } - orderings - } - MergeTaskCommand::AllPermutations => { - let orders = &self.groups[task.group_idx]; - let orderings = (0..orders.orders.len()).collect::>(); - orderings - .into_iter() - .permutations(orders.orders.len()) - .collect() - } - MergeTaskCommand::RandomPermutations { seed, count } => { - let mut orderings = vec![]; - - let orders = &self.groups[task.group_idx]; - let mut indexes = (0..orders.orders.len()).collect::>(); - let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); - for _ in 0..count { - indexes.shuffle(&mut rng); - orderings.push(indexes.clone()); - } - - orderings - } - }; - - for ordering in orderings_to_try { - let mut partial_block = PartialBlock::new(true, None); - let mut state = BlockState::new_arc(state_provider.clone()) - .with_cached_reads(self.cache.take().unwrap_or_default()); - partial_block.pre_block_call(&self.ctx, &mut state)?; - - let mut ordering = ordering; - ordering.reverse(); // we will use it as a queue: pop() and push() - let mut result_ordering = Vec::new(); - let mut total_profit = U256::ZERO; - let mut pending_orders: HashMap<(Address, u64), usize> = HashMap::default(); - loop { - if self.cancellation_token.is_cancelled() { - let (cached_reads, _, _) = state.into_parts(); - self.cache = Some(cached_reads); - return Ok(()); - } - - let order_idx = if let Some(order_idx) = ordering.pop() { - order_idx - } else { - break; - }; - let sim_order = &self.groups[task.group_idx].orders[order_idx]; - let commit_result = partial_block.commit_order(sim_order, &self.ctx, &mut state)?; - match commit_result { - Ok(res) => { - for (address, nonce) in res.nonces_updated { - if let Some(pending_order) = pending_orders.remove(&(address, nonce)) { - ordering.push(pending_order); - } - } - total_profit += res.coinbase_profit; - result_ordering.push((order_idx, res.coinbase_profit)); - } - Err(err) => { - if let Some((address, nonce)) = - err.try_get_tx_too_high_error(&sim_order.order) - { - pending_orders.insert((address, nonce), order_idx); - } - } - } - } - - let mut best_ordering = self.groups[task.group_idx].best_ordering.lock().unwrap(); - if best_ordering.total_profit < total_profit { - best_ordering.total_profit = total_profit; - best_ordering.orders = result_ordering; - } - - let (new_cached_reads, _, _) = state.into_parts(); - self.cache = Some(new_cached_reads); - } - - Ok(()) - } - - pub fn into_cached_reads(self) -> CachedReads { - self.cache.unwrap_or_default() - } -} diff --git a/crates/rbuilder/src/building/builders/merging_builder/merging_pool.rs b/crates/rbuilder/src/building/builders/merging_builder/merging_pool.rs deleted file mode 100644 index 4be99d88..00000000 --- a/crates/rbuilder/src/building/builders/merging_builder/merging_pool.rs +++ /dev/null @@ -1,132 +0,0 @@ -use crate::building::BlockBuildingContext; -use reth_payload_builder::database::CachedReads; -use reth_provider::StateProviderFactory; -use std::{sync::Arc, time::Instant}; -use tokio_util::sync::CancellationToken; -use tracing::{trace, warn}; - -use super::{ - merger::{MergeTask, MergeTaskCommand, MergingContext}, - OrderGroup, -}; - -/// `MergingPool` is a set of threads that try ordering for the given groups of orders. -pub struct MergingPool

{ - provider: P, - ctx: BlockBuildingContext, - num_threads: usize, - global_cancellation_token: CancellationToken, - cached_reads: Vec, - - current_task_cancellaton_token: CancellationToken, - thread_handles: Vec>, -} - -impl

MergingPool

-where - P: StateProviderFactory + Clone + 'static, -{ - pub fn new( - provider: P, - ctx: BlockBuildingContext, - num_threads: usize, - global_cancellation_token: CancellationToken, - ) -> Self { - Self { - provider, - ctx, - num_threads, - global_cancellation_token, - cached_reads: Vec::new(), - current_task_cancellaton_token: CancellationToken::new(), - thread_handles: Vec::new(), - } - } - - pub fn start_merging_tasks(&mut self, groups: Vec) -> eyre::Result<()> { - self.current_task_cancellaton_token = self.global_cancellation_token.child_token(); - - let queue = Arc::new(crossbeam_queue::ArrayQueue::new(10_000)); - for (group_idx, group) in groups.iter().enumerate() { - if group.orders.len() == 1 { - continue; - } else if group.orders.len() <= 3 { - let _ = queue.push(MergeTask { - group_idx, - command: MergeTaskCommand::AllPermutations, - }); - } else { - let _ = queue.push(MergeTask { - group_idx, - command: MergeTaskCommand::StaticOrdering { - extra_orderings: true, - }, - }); - } - } - // Here we fill queue of tasks with random ordering tasks for big groups. - for i in 0..150 { - for (group_idx, group) in groups.iter().enumerate() { - if group.orders.len() > 3 { - let _ = queue.push(MergeTask { - group_idx, - command: MergeTaskCommand::RandomPermutations { - seed: group_idx as u64 + i, - count: 2, - }, - }); - } - } - } - - for idx in 0..self.num_threads { - let mut merging_context = MergingContext::new( - self.provider.clone(), - self.ctx.clone(), - groups.clone(), - self.current_task_cancellaton_token.clone(), - self.cached_reads.pop(), - ); - - let cancellation = self.current_task_cancellaton_token.clone(); - let queue = queue.clone(); - let handle = std::thread::Builder::new() - .name(format!("merging-thread-{}", idx)) - .spawn(move || { - while let Some(task) = queue.pop() { - if cancellation.is_cancelled() { - break; - } - - let start = Instant::now(); - if let Err(err) = merging_context.run_merging_task(task.clone()) { - warn!("Error running merging task: {:?}", err); - } - trace!( - "Finished merging task: {:?}, elapsed: {:?}, len: {}", - task, - start.elapsed(), - merging_context.groups[task.group_idx].orders.len(), - ); - } - - merging_context.into_cached_reads() - })?; - - self.thread_handles.push(handle); - } - - Ok(()) - } - - pub fn stop_merging_threads(&mut self) -> eyre::Result<()> { - self.current_task_cancellaton_token.cancel(); - for join_handle in self.thread_handles.drain(..) { - let reads = join_handle - .join() - .map_err(|_err| eyre::eyre!("Error joining merging thread"))?; - self.cached_reads.push(reads); - } - Ok(()) - } -} diff --git a/crates/rbuilder/src/building/builders/merging_builder/mod.rs b/crates/rbuilder/src/building/builders/merging_builder/mod.rs deleted file mode 100644 index 75b4398e..00000000 --- a/crates/rbuilder/src/building/builders/merging_builder/mod.rs +++ /dev/null @@ -1,292 +0,0 @@ -pub mod combinator; -pub mod groups; -pub mod merger; -pub mod merging_pool; -pub mod order_intake_store; -use self::order_intake_store::OrderIntakeStore; -use crate::{ - building::builders::{ - handle_building_error, BacktestSimulateBlockInput, Block, BlockBuildingAlgorithm, - BlockBuildingAlgorithmInput, LiveBuilderInput, - }, - roothash::RootHashConfig, -}; -use alloy_primitives::{utils::format_ether, Address}; -use combinator::CombinatorContext; -pub use groups::*; -use merger::{MergeTask, MergeTaskCommand, MergingContext}; -use merging_pool::MergingPool; -use reth::tasks::pool::BlockingTaskPool; -use reth_db::database::Database; -use reth_payload_builder::database::CachedReads; -use reth_provider::{DatabaseProviderFactory, StateProviderFactory}; -use serde::Deserialize; -use std::time::{Duration, Instant}; -use time::OffsetDateTime; -use tokio_util::sync::CancellationToken; -use tracing::{error, trace}; - -/// MergingBuilderConfig configures merging builder. -/// * `num_threads` - number of threads to use for merging. -/// * `merge_wait_time_ms` - time to wait for merging to finish before consuming new orders. -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -#[serde(deny_unknown_fields)] -pub struct MergingBuilderConfig { - pub discard_txs: bool, - pub num_threads: usize, - pub merge_wait_time_ms: u64, - #[serde(default)] - pub coinbase_payment: bool, -} - -impl MergingBuilderConfig { - fn merge_wait_time(&self) -> Duration { - Duration::from_millis(self.merge_wait_time_ms) - } -} - -pub fn run_merging_builder(input: LiveBuilderInput, config: &MergingBuilderConfig) -where - DB: Database + Clone + 'static, - P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, -{ - let block_number = input.ctx.block_env.number.to::(); - let mut order_intake_consumer = - OrderIntakeStore::new(input.input, &input.sbundle_mergeabe_signers); - - let mut merging_combinator = CombinatorContext::new( - input.provider.clone(), - input.root_hash_task_pool, - input.ctx.clone(), - vec![], - input.cancel.clone(), - None, - config.discard_txs, - config.coinbase_payment, - input.root_hash_config, - input.builder_name, - ); - - let mut merging_pool = MergingPool::new( - input.provider.clone(), - input.ctx.clone(), - config.num_threads, - input.cancel.clone(), - ); - - let mut cached_groups = CachedGroups::new(); - - 'building: loop { - if input.cancel.is_cancelled() { - break 'building; - } - - match order_intake_consumer.consume_next_batch() { - Ok(ok) => { - if !ok { - break 'building; - } - } - Err(err) => { - error!(?err, "Error consuming next order batch"); - continue; - } - } - - let orders_closed_at = OffsetDateTime::now_utc(); - - let new_orders = order_intake_consumer.drain_new_orders(); - // We can update cached_groups if we have ONLY adds - if let Some(new_orders) = new_orders { - cached_groups.add_orders(new_orders); - } else { - cached_groups = CachedGroups::new(); - cached_groups.add_orders(order_intake_consumer.get_orders()); - } - - let groups = cached_groups.get_order_groups(); - - let group_count = groups.len(); - let order_count = groups.iter().map(|g| g.orders.len()).sum::(); - merging_combinator.set_groups(groups.clone()); - - let start = Instant::now(); - if let Err(error) = merging_pool.stop_merging_threads() { - error!( - ?error, - block_number, "Error stopping merging threads, cancelling slot, building block", - ); - break 'building; - } - trace!( - time_ms = start.elapsed().as_millis(), - "Stopped merging threads" - ); - if let Err(error) = merging_pool.start_merging_tasks(groups) { - error!( - ?error, - block_number, "Error starting merging tasks, cancelling slot, building block", - ); - break 'building; - } - - let merging_start_time = Instant::now(); - trace!("Starting new merging batch"); - 'merging: loop { - if input.cancel.is_cancelled() { - break 'building; - } - if merging_start_time.elapsed() > config.merge_wait_time() { - break 'merging; - } - - match merging_combinator.combine_best_groups_mergings( - orders_closed_at, - input.sink.can_use_suggested_fee_recipient_as_coinbase(), - ) { - Ok(block) => { - trace!( - group_count, - order_count, - bid_value = format_ether(block.built_block_trace().bid_value), - "Merger combined best group orderings" - ); - input.sink.new_block(block); - } - Err(err) => { - if !handle_building_error(err) { - break 'building; - } - } - } - } - } -} - -pub fn merging_build_backtest( - input: BacktestSimulateBlockInput<'_, P>, - config: MergingBuilderConfig, -) -> eyre::Result<(Block, CachedReads)> -where - DB: Database + Clone + 'static, - P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, -{ - let sorted_orders = { - let mut orders = input.sim_orders.clone(); - orders.sort_by_key(|o| o.order.id()); - orders - }; - - let groups = split_orders_into_groups(sorted_orders); - - let mut merging_context = MergingContext::new( - input.provider.clone(), - input.ctx.clone(), - groups.clone(), - CancellationToken::new(), - input.cached_reads, - ); - - for (group_idx, group) in groups.iter().enumerate() { - if group.orders.len() == 1 { - continue; - } else if group.orders.len() <= 3 { - merging_context.run_merging_task(MergeTask { - group_idx, - command: MergeTaskCommand::AllPermutations, - })?; - } else { - merging_context.run_merging_task(MergeTask { - group_idx, - command: MergeTaskCommand::StaticOrdering { - extra_orderings: true, - }, - })?; - merging_context.run_merging_task(MergeTask { - group_idx, - command: MergeTaskCommand::RandomPermutations { seed: 0, count: 10 }, - })?; - } - } - - let cache_reads = merging_context.into_cached_reads(); - - let mut combinator_context = CombinatorContext::new( - input.provider, - BlockingTaskPool::build()?, - input.ctx.clone(), - groups.clone(), - CancellationToken::new(), - Some(cache_reads), - config.discard_txs, - config.coinbase_payment, - RootHashConfig::skip_root_hash(), - input.builder_name, - ); - - let block_builder = combinator_context - .combine_best_groups_mergings(OffsetDateTime::now_utc(), config.coinbase_payment)?; - let payout_tx_value = if config.coinbase_payment { - None - } else { - Some(block_builder.true_block_value()?) - }; - let finalize_block_result = block_builder.finalize_block(payout_tx_value)?; - Ok(( - finalize_block_result.block, - finalize_block_result.cached_reads, - )) -} - -#[derive(Debug)] -pub struct MergingBuildingAlgorithm { - root_hash_config: RootHashConfig, - root_hash_task_pool: BlockingTaskPool, - sbundle_mergeabe_signers: Vec

, - config: MergingBuilderConfig, - name: String, -} - -impl MergingBuildingAlgorithm { - pub fn new( - root_hash_config: RootHashConfig, - root_hash_task_pool: BlockingTaskPool, - sbundle_mergeabe_signers: Vec
, - config: MergingBuilderConfig, - name: String, - ) -> Self { - Self { - root_hash_config, - root_hash_task_pool, - sbundle_mergeabe_signers, - config, - name, - } - } -} - -impl BlockBuildingAlgorithm for MergingBuildingAlgorithm -where - DB: Database + Clone + 'static, - P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, -{ - fn name(&self) -> String { - self.name.clone() - } - - fn build_blocks(&self, input: BlockBuildingAlgorithmInput

) { - let live_input = LiveBuilderInput { - provider: input.provider, - root_hash_config: self.root_hash_config.clone(), - root_hash_task_pool: self.root_hash_task_pool.clone(), - ctx: input.ctx.clone(), - input: input.input, - sink: input.sink, - builder_name: self.name.clone(), - cancel: input.cancel, - sbundle_mergeabe_signers: self.sbundle_mergeabe_signers.clone(), - phantom: Default::default(), - }; - run_merging_builder(live_input, &self.config); - } -} diff --git a/crates/rbuilder/src/building/builders/mod.rs b/crates/rbuilder/src/building/builders/mod.rs index a7fba178..8232efbc 100644 --- a/crates/rbuilder/src/building/builders/mod.rs +++ b/crates/rbuilder/src/building/builders/mod.rs @@ -1,8 +1,8 @@ //! builders is a subprocess that builds a block pub mod block_building_helper; -pub mod merging_builder; pub mod mock_block_building_helper; pub mod ordering_builder; +pub mod parallel_builder; use crate::{ building::{BlockBuildingContext, BlockOrders, BuiltBlockTrace, SimulatedOrderSink, Sorting}, diff --git a/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs b/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs new file mode 100644 index 00000000..857e2755 --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs @@ -0,0 +1,358 @@ +use super::{ + results_aggregator::BestResults, ConflictGroup, GroupId, ParallelBuilderConfig, + ResolutionResult, +}; +use ahash::HashMap; +use alloy_primitives::utils::format_ether; +use reth::tasks::pool::BlockingTaskPool; +use reth_db::Database; +use reth_payload_builder::database::CachedReads; +use reth_provider::{DatabaseProviderFactory, StateProviderFactory}; +use std::sync::Arc; +use std::{marker::PhantomData, time::Instant}; +use time::OffsetDateTime; +use tokio_util::sync::CancellationToken; +use tracing::{trace, warn}; + +use crate::{ + building::{ + builders::{ + block_building_helper::{BlockBuildingHelper, BlockBuildingHelperFromProvider}, + UnfinishedBlockBuildingSink, + }, + BlockBuildingContext, + }, + roothash::RootHashConfig, +}; + +/// Assembles block building results from the best orderings of order groups. +pub struct BlockBuildingResultAssembler { + provider: P, + root_hash_task_pool: BlockingTaskPool, + ctx: BlockBuildingContext, + cancellation_token: CancellationToken, + cached_reads: Option, + discard_txs: bool, + coinbase_payment: bool, + can_use_suggested_fee_recipient_as_coinbase: bool, + root_hash_config: RootHashConfig, + builder_name: String, + sink: Option>, + best_results: Arc, + run_id: u64, + last_version: Option, + phantom: PhantomData, +} + +impl BlockBuildingResultAssembler +where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, +{ + /// Creates a new `BlockBuildingResultAssembler`. + /// + /// # Arguments + /// + /// * `input` - The live builder input containing necessary components. + /// * `config` - The configuration for the Parallel builder. + /// * `build_trigger_receiver` - A receiver for build trigger signals. + /// * `best_results` - A shared map of the best results for each group. + #[allow(clippy::too_many_arguments)] + pub fn new( + config: &ParallelBuilderConfig, + root_hash_config: RootHashConfig, + best_results: Arc, + provider: P, + root_hash_task_pool: BlockingTaskPool, + ctx: BlockBuildingContext, + cancellation_token: CancellationToken, + builder_name: String, + can_use_suggested_fee_recipient_as_coinbase: bool, + sink: Option>, + ) -> Self { + Self { + provider, + root_hash_task_pool, + ctx, + cancellation_token, + cached_reads: None, + discard_txs: config.discard_txs, + coinbase_payment: config.coinbase_payment, + can_use_suggested_fee_recipient_as_coinbase, + root_hash_config, + builder_name, + sink, + best_results, + run_id: 0, + last_version: None, + phantom: PhantomData, + } + } + + /// Runs the block building process continuously. + /// + /// # Arguments + /// + /// * `cancel_token` - A token to signal cancellation of the process. + pub fn run(&mut self, cancel_token: CancellationToken) { + trace!( + "Parallel builder run id {}: Block building result assembler run started", + self.run_id + ); + // To-do: decide if we want to trigger builds here or just build in a continuous loop + loop { + if cancel_token.is_cancelled() { + break; + } + if self.best_results.get_number_of_orders() > 0 { + let orders_closed_at = OffsetDateTime::now_utc(); + self.try_build_block(orders_closed_at); + } + } + trace!( + "Parallel builder run id {}: Block building result assembler run finished", + self.run_id + ); + } + + /// Attempts to build a new block if not already building. + /// + /// # Arguments + /// + /// * `orders_closed_at` - The timestamp when orders were closed. + fn try_build_block(&mut self, orders_closed_at: OffsetDateTime) { + let time_start = Instant::now(); + + let current_best_results = self.best_results.clone(); + let (mut best_orderings_per_group, version) = + current_best_results.get_results_and_version(); + + // Check if version has incremented + if let Some(last_version) = self.last_version { + if version == last_version { + return; + } + } + self.last_version = Some(version); + + trace!( + "Parallel builder run id {}: Attempting to build block with results version {}", + self.run_id, + version + ); + + if best_orderings_per_group.is_empty() { + return; + } + + match self.build_new_block(&mut best_orderings_per_group, orders_closed_at) { + Ok(new_block) => { + if let Ok(value) = new_block.true_block_value() { + trace!( + "Parallel builder run id {}: Built new block with results version {:?} and profit: {:?} in {:?} ms", + self.run_id, + version, + format_ether(value), + time_start.elapsed().as_millis() + ); + + if new_block.built_block_trace().got_no_signer_error { + self.can_use_suggested_fee_recipient_as_coinbase = false; + } + + if let Some(sink) = &self.sink { + sink.new_block(new_block); + } + } + } + Err(e) => { + warn!("Parallel builder run id {}: Failed to build new block with results version {:?}: {:?}", self.run_id, version, e); + } + } + self.run_id += 1; + } + + /// Builds a new block using the best results from each group. + /// + /// # Arguments + /// + /// * `best_results` - The current best results for each group. + /// * `orders_closed_at` - The timestamp when orders were closed. + /// + /// # Returns + /// + /// A Result containing the new block building helper or an error. + pub fn build_new_block( + &mut self, + best_orderings_per_group: &mut [(ResolutionResult, ConflictGroup)], + orders_closed_at: OffsetDateTime, + ) -> eyre::Result> { + let build_start = Instant::now(); + + let use_suggested_fee_recipient_as_coinbase = self.coinbase_payment + && !self.contains_refunds(best_orderings_per_group) + && self.can_use_suggested_fee_recipient_as_coinbase; + + // Create a new ctx to remove builder_signer if necessary + let mut ctx = self.ctx.clone(); + if use_suggested_fee_recipient_as_coinbase { + ctx.modify_use_suggested_fee_recipient_as_coinbase(); + } + + let mut block_building_helper = BlockBuildingHelperFromProvider::new( + self.provider.clone(), + self.root_hash_task_pool.clone(), + self.root_hash_config.clone(), + ctx, + self.cached_reads.clone(), + self.builder_name.clone(), + self.discard_txs, + None, + self.cancellation_token.clone(), + )?; + block_building_helper.set_trace_orders_closed_at(orders_closed_at); + + // Sort groups by total profit in descending order + best_orderings_per_group.sort_by(|(a_ordering, _), (b_ordering, _)| { + b_ordering.total_profit.cmp(&a_ordering.total_profit) + }); + + loop { + if self.cancellation_token.is_cancelled() { + break; + } + + // Find the first non-empty group + let group_with_orders = best_orderings_per_group + .iter_mut() + .find(|(sequence_of_orders, _)| !sequence_of_orders.sequence_of_orders.is_empty()); + + if let Some((sequence_of_orders, order_group)) = group_with_orders { + // Get the next order from this group + let (order_idx, _) = sequence_of_orders.sequence_of_orders.remove(0); + let sim_order = &order_group.orders[order_idx]; + + let start_time = Instant::now(); + let commit_result = block_building_helper.commit_order(sim_order)?; + let order_commit_time = start_time.elapsed(); + + let mut gas_used = 0; + let mut execution_error = None; + let success = commit_result.is_ok(); + match commit_result { + Ok(res) => { + gas_used = res.gas_used; + } + Err(err) => execution_error = Some(err), + } + trace!( + order_id = ?sim_order.id(), + success, + order_commit_time_mus = order_commit_time.as_micros(), + gas_used, + ?execution_error, + "Executed order" + ); + } else { + // No more orders in any group + break; + } + } + block_building_helper.set_trace_fill_time(build_start.elapsed()); + self.cached_reads = Some(block_building_helper.clone_cached_reads()); + Ok(Box::new(block_building_helper)) + } + + pub fn build_backtest_block( + &self, + best_results: HashMap, + orders_closed_at: OffsetDateTime, + ) -> eyre::Result> { + let mut block_building_helper = BlockBuildingHelperFromProvider::new( + self.provider.clone(), + self.root_hash_task_pool.clone(), + self.root_hash_config.clone(), // Adjust as needed for backtest + self.ctx.clone(), + None, // No cached reads for backtest start + String::from("backtest_builder"), + self.discard_txs, + None, + CancellationToken::new(), + )?; + + block_building_helper.set_trace_orders_closed_at(orders_closed_at); + + let mut best_orderings_per_group: Vec<(ResolutionResult, ConflictGroup)> = + best_results.into_values().collect(); + + // Sort groups by total profit in descending order + best_orderings_per_group.sort_by(|(a_ordering, _), (b_ordering, _)| { + b_ordering.total_profit.cmp(&a_ordering.total_profit) + }); + + let use_suggested_fee_recipient_as_coinbase = + self.coinbase_payment && !self.contains_refunds(&best_orderings_per_group); + + // Modify ctx if necessary + let mut ctx = self.ctx.clone(); + if use_suggested_fee_recipient_as_coinbase { + ctx.modify_use_suggested_fee_recipient_as_coinbase(); + } + + let build_start = Instant::now(); + + for (sequence_of_orders, order_group) in best_orderings_per_group.iter_mut() { + for (order_idx, _) in sequence_of_orders.sequence_of_orders.iter() { + let sim_order = &order_group.orders[*order_idx]; + + let commit_result = block_building_helper.commit_order(sim_order)?; + + match commit_result { + Ok(res) => { + tracing::trace!( + order_id = ?sim_order.id(), + success = true, + gas_used = res.gas_used, + "Executed order in backtest" + ); + } + Err(err) => { + tracing::trace!( + order_id = ?sim_order.id(), + success = false, + error = ?err, + "Failed to execute order in backtest" + ); + } + } + } + } + + block_building_helper.set_trace_fill_time(build_start.elapsed()); + + Ok(Box::new(block_building_helper)) + } + + /// Checks if any of the orders in the given orderings contain refunds. + /// + /// # Arguments + /// + /// * `orderings` - A slice of tuples containing group orderings and order groups. + /// + /// # Returns + /// + /// `true` if any order contains refunds, `false` otherwise. + fn contains_refunds(&self, orderings: &[(ResolutionResult, ConflictGroup)]) -> bool { + orderings.iter().any(|(sequence_of_orders, order_group)| { + sequence_of_orders + .sequence_of_orders + .iter() + .any(|(order_idx, _)| { + !order_group.orders[*order_idx] + .sim_value + .paid_kickbacks + .is_empty() + }) + }) + } +} diff --git a/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolvers.rs b/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolvers.rs new file mode 100644 index 00000000..046dc61d --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolvers.rs @@ -0,0 +1,705 @@ +use ahash::HashMap; +use alloy_primitives::{Address, U256}; +use eyre::Result; +use itertools::Itertools; +use rand::{seq::SliceRandom, SeedableRng}; +use reth::providers::StateProvider; +use reth_payload_builder::database::CachedReads; +use reth_provider::StateProviderFactory; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; +use tracing::trace; + +use super::simulation_cache::{CachedSimulationState, SharedSimulationCache}; +use super::{Algorithm, ConflictTask, ResolutionResult}; + +use crate::building::{BlockBuildingContext, BlockState, PartialBlock}; +use crate::building::{ExecutionError, ExecutionResult}; +use crate::primitives::{OrderId, SimulatedOrder}; + +/// Context for resolving conflicts in merging tasks. +#[derive(Debug)] +pub struct ResolverContext

{ + pub provider: P, + pub ctx: BlockBuildingContext, + pub cancellation_token: CancellationToken, + pub cache: Option, + pub simulation_cache: Arc, +} + +impl

ResolverContext

+where + P: StateProviderFactory + Clone + 'static, +{ + /// Creates a new `ResolverContext`. + /// + /// # Arguments + /// + /// * `provider_factory` - Factory for creating state providers. + /// * `ctx` - Context for block building. + /// * `cancellation_token` - Token for cancelling operations. + /// * `cache` - Optional cached reads for optimization. + /// * `simulation_cache` - Shared cache for simulation results. + pub fn new( + provider: P, + ctx: BlockBuildingContext, + cancellation_token: CancellationToken, + cache: Option, + simulation_cache: Arc, + ) -> Self { + ResolverContext { + provider, + ctx, + cancellation_token, + cache, + simulation_cache, + } + } + + /// Runs a merging task and returns the best [ResolutionResult] found. + /// + /// # Arguments + /// + /// * `task` - The [ConflictTask] to run. + /// + /// # Returns + /// + /// The best [ResolutionResult] and corresponding sequence of order indices found. + pub fn run_conflict_task(&mut self, task: ConflictTask) -> Result { + trace!( + "run_conflict_task: {:?} with algorithm {:?}", + task.group.id, + task.algorithm + ); + let state_provider = self + .provider + .history_by_block_hash(self.ctx.attributes.parent)?; + let state_provider: Arc = Arc::from(state_provider); + + let sequence_to_try = generate_sequences_of_orders_to_try(&task); + + let mut best_resolution_result = ResolutionResult { + total_profit: U256::ZERO, + sequence_of_orders: vec![], + }; + + for sequence_of_orders in sequence_to_try { + let (resolution_result, state) = + self.process_sequence_of_orders(sequence_of_orders, &task, &state_provider)?; + self.update_best_result(resolution_result, &mut best_resolution_result); + + let (new_cached_reads, _, _) = state.into_parts(); + self.cache = Some(new_cached_reads); + } + + trace!( + "Resolved conflict task {:?} with profit: {:?} and algorithm: {:?}", + task.group.id, + best_resolution_result.total_profit, + task.algorithm + ); + Ok(best_resolution_result) + } + + /// Updates the best result if a better one is found. + /// + /// # Arguments + /// + /// * `new_result` - The newly processed result. + /// * `best_result` - The current best result to update. + fn update_best_result( + &mut self, + new_result: ResolutionResult, + best_result: &mut ResolutionResult, + ) { + if best_result.total_profit < new_result.total_profit { + best_result.total_profit = new_result.total_profit; + best_result.sequence_of_orders = new_result.sequence_of_orders; + } + } + + /// Processes a single sequence of orders, utilizing the simulation cache. + /// + /// # Arguments + /// + /// * `sequence_of_orders` - The order of transaction indices to process. + /// * `task` - The current conflict task. + /// * `state_provider` - The state provider for the current block. + /// + /// # Returns + /// + /// A tuple containing the resolution result and the final block state. + fn process_sequence_of_orders( + &mut self, + sequence_of_orders: Vec, + task: &ConflictTask, + state_provider: &Arc, + ) -> Result<(ResolutionResult, BlockState)> { + let order_id_to_index = self.initialize_order_id_to_index_map(task); + let full_sequence_of_orders = self.initialize_full_order_ids_vec(&sequence_of_orders, task); + + // Check for cached simulation state + let (cached_state_option, cached_up_to_index) = self + .simulation_cache + .get_cached_state(&full_sequence_of_orders); + + // Initialize state and partial block + let mut partial_block = PartialBlock::new(true, None); + let mut state = self.initialize_block_state(&cached_state_option, state_provider); + partial_block.pre_block_call(&self.ctx, &mut state)?; + + // Initialize sequenced_order_result + let mut sequenced_order_result = + self.initialize_result_order_sequence(&cached_state_option, &order_id_to_index); + + let mut total_profit = cached_state_option + .as_ref() + .map_or(U256::ZERO, |cached| cached.total_profit); + + let mut per_order_profits = cached_state_option + .as_ref() + .map_or(Vec::new(), |cached| cached.per_order_profits.clone()); + + // Prepare the sequence of orders to try, skipping already cached orders + let mut remaining_orders = sequence_of_orders[cached_up_to_index..].to_vec(); + remaining_orders.reverse(); // Use as a stack: pop from the end + + let mut pending_orders: HashMap<(Address, u64), usize> = HashMap::default(); + + // Processing loop + while let Some(order_idx) = remaining_orders.pop() { + if self.cancellation_token.is_cancelled() { + return Err(eyre::eyre!("Cancelled")); + } + + let sim_order = &task.group.orders[order_idx]; + match partial_block.commit_order(sim_order, &self.ctx, &mut state)? { + Ok(res) => self.handle_successful_commit( + res, + sim_order, + order_idx, + &mut pending_orders, + &mut remaining_orders, + &mut sequenced_order_result, + &mut total_profit, + &mut per_order_profits, + ), + Err(err) => self.handle_err(&err, sim_order, &mut pending_orders, order_idx), + } + } + + self.store_simulation_state( + &full_sequence_of_orders, + &state, + total_profit, + &per_order_profits, + ); + + let resolution_result = ResolutionResult { + total_profit, + sequence_of_orders: sequenced_order_result, + }; + Ok((resolution_result, state)) + } + + /// Helper function to handle a successful commit of an order. + #[allow(clippy::too_many_arguments)] + fn handle_successful_commit( + &mut self, + res: ExecutionResult, + sim_order: &SimulatedOrder, + order_idx: usize, + pending_orders: &mut HashMap<(Address, u64), usize>, + remaining_orders: &mut Vec, + sequenced_order_result: &mut Vec<(usize, U256)>, + total_profit: &mut U256, + per_order_profits: &mut Vec<(OrderId, U256)>, + ) { + for (address, nonce) in res.nonces_updated { + if let Some(pending_order) = pending_orders.remove(&(address, nonce)) { + remaining_orders.push(pending_order); + } + } + let order_id = sim_order.order.id(); + *total_profit += res.coinbase_profit; + per_order_profits.push((order_id, res.coinbase_profit)); + sequenced_order_result.push((order_idx, res.coinbase_profit)); + } + + /// Helper function to handle an error in committing an order. + fn handle_err( + &mut self, + err: &ExecutionError, + sim_order: &SimulatedOrder, + pending_orders: &mut HashMap<(Address, u64), usize>, + order_idx: usize, + ) { + if let Some((address, nonce)) = err.try_get_tx_too_high_error(&sim_order.order) { + pending_orders.insert((address, nonce), order_idx); + }; + } + + /// Initializes a HashMap of order id to index. + fn initialize_order_id_to_index_map(&self, task: &ConflictTask) -> HashMap { + task.group + .orders + .iter() + .enumerate() + .map(|(idx, sim_order)| (sim_order.order.id(), idx)) + .collect() + } + + /// Initializes a vector of full order ids corresponding to the sequence of orders. + fn initialize_full_order_ids_vec( + &self, + sequence_of_orders: &[usize], + task: &ConflictTask, + ) -> Vec { + sequence_of_orders + .iter() + .map(|&idx| task.group.orders[idx].order.id()) + .collect() + } + + /// Initializes the tuple of (order_idx, profit) for the resolution result using the cached state if available. + fn initialize_result_order_sequence( + &self, + cached_state_option: &Option>, + order_id_to_index: &HashMap, + ) -> Vec<(usize, U256)> { + if let Some(cached_state) = &cached_state_option { + cached_state + .per_order_profits + .iter() + .filter_map(|(order_id, profit)| { + order_id_to_index.get(order_id).map(|&idx| (idx, *profit)) + }) + .collect::>() + } else { + Vec::new() + } + } + + /// Initializes the block state, using a cached state if available. + fn initialize_block_state( + &mut self, + cached_state_option: &Option>, + state_provider: &Arc, + ) -> BlockState { + if let Some(cached_state) = &cached_state_option { + // Use cached state + BlockState::new_arc(state_provider.clone()) + .with_cached_reads(cached_state.cached_reads.clone()) + .with_bundle_state(cached_state.bundle_state.clone()) + } else { + // If we don't have a cached state from the simulation cache, we use the cached reads from the block state in some cases + if let Some(cache) = &self.cache { + BlockState::new_arc(state_provider.clone()).with_cached_reads(cache.clone()) + } else { + BlockState::new_arc(state_provider.clone()) + } + } + } + + /// Stores the simulation state in the cache. + fn store_simulation_state( + &self, + full_order_ids: &[OrderId], + state: &BlockState, + total_profit: U256, + per_order_profits: &[(OrderId, U256)], + ) { + let (cached_reads, bundle_state, _) = state.clone().into_parts(); + let cached_simulation_state = CachedSimulationState { + cached_reads, + bundle_state, + total_profit, + per_order_profits: per_order_profits.to_owned(), + }; + self.simulation_cache + .store_cached_state(full_order_ids, cached_simulation_state); + } +} + +/// Generates different sequences of orders to try based on the conflict task command. +/// +/// # Arguments +/// +/// * `task` - The conflict task containing the algorithm for generating sequences. +/// +/// # Returns +/// +/// A vector of different sequences of order indices to try. +fn generate_sequences_of_orders_to_try(task: &ConflictTask) -> Vec> { + match task.algorithm { + Algorithm::Greedy => generate_greedy_sequence(task, false), + Algorithm::ReverseGreedy => generate_greedy_sequence(task, true), + Algorithm::Length => generate_length_based_sequence(task), + Algorithm::AllPermutations => generate_all_permutations(task), + Algorithm::Random { seed, count } => generate_random_permutations(task, seed, count), + } +} + +/// Generates random permutations of sequences of order indices. +/// +/// # Arguments +/// +/// * `task` - The current conflict task. +/// * `seed` - Seed for the random number generator. +/// * `count` - Number of random permutations to generate. +/// +/// # Returns +/// +/// A vector of randomly generated sequences of order indices. +fn generate_random_permutations(task: &ConflictTask, seed: u64, count: usize) -> Vec> { + let mut sequences_of_orders = vec![]; + + let order_group = &task.group; + let mut indexes = (0..order_group.orders.len()).collect::>(); + let mut rng = rand::rngs::SmallRng::seed_from_u64(seed); + for _ in 0..count { + indexes.shuffle(&mut rng); + sequences_of_orders.push(indexes.clone()); + } + + sequences_of_orders +} + +/// Generates all possible permutations of sequences of order indices. +/// +/// # Arguments +/// +/// * `task` - The current conflict task. +/// +/// # Returns +/// +/// A vector of all possible sequences of order indices. +fn generate_all_permutations(task: &ConflictTask) -> Vec> { + let order_group = &task.group; + let sequences_of_orders = (0..order_group.orders.len()).collect::>(); + sequences_of_orders + .into_iter() + .permutations(order_group.orders.len()) + .collect() +} + +/// Generates static sequences of order indices based on gas price and coinbase profit. +/// +/// # Arguments +/// +/// * `task` - The current conflict task. +/// * `reverse` - Whether to reverse the sorting order (e.g. sorting by min coinbase profit and mev_gas_price) +/// +/// # Returns +/// +/// A vector of static sequences of order indices, sorted by coinbase profit and mev_gas_price. +fn generate_greedy_sequence(task: &ConflictTask, reverse: bool) -> Vec> { + let order_group = &task.group; + + let create_sequence = |value_extractor: fn(&SimulatedOrder) -> U256| { + let mut ids_and_value: Vec<_> = order_group + .orders + .iter() + .enumerate() + .map(|(idx, order)| (idx, value_extractor(order))) + .collect(); + + ids_and_value.sort_by(|a, b| { + if reverse { + a.1.cmp(&b.1) + } else { + b.1.cmp(&a.1) + } + }); + ids_and_value.into_iter().map(|(idx, _)| idx).collect() + }; + + vec![ + create_sequence(|sim_order| sim_order.sim_value.coinbase_profit), + create_sequence(|sim_order| sim_order.sim_value.mev_gas_price), + ] +} + +/// Generates length based sequences of order indices based on the length of the orders. +/// e.g. prioritizes longer bundles first +/// +/// # Arguments +/// +/// * `task` - The current conflict task. +/// +/// # Returns +/// +/// A vector of length based sequences of order indices. +fn generate_length_based_sequence(task: &ConflictTask) -> Vec> { + let mut sequences_of_orders = vec![]; + let order_group = &task.group; + + let mut order_data: Vec<(usize, usize, U256)> = order_group + .orders + .iter() + .enumerate() + .map(|(idx, order)| { + ( + idx, + order.order.list_txs().len(), + order.sim_value.coinbase_profit, + ) + }) + .collect(); + + // Sort by length (descending) and then by profit (descending) as a tie-breaker + order_data.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| b.2.cmp(&a.2))); + + // Extract the sorted indices + let length_based_sequence: Vec = order_data.into_iter().map(|(idx, _, _)| idx).collect(); + + sequences_of_orders.push(length_based_sequence); + sequences_of_orders +} + +#[cfg(test)] +mod tests { + use std::time::Instant; + + use ahash::HashSet; + use uuid::Uuid; + + use alloy_primitives::{Address, TxHash, B256, U256}; + use reth::primitives::{ + Transaction, TransactionSigned, TransactionSignedEcRecovered, TxLegacy, + }; + + use super::*; + use crate::{ + building::builders::parallel_builder::{ConflictGroup, GroupId, TaskPriority}, + primitives::{ + Bundle, Metadata, Order, SimValue, SimulatedOrder, + TransactionSignedEcRecoveredWithBlobs, + }, + }; + + struct DataGenerator { + last_used_id: u64, + } + impl DataGenerator { + pub fn new() -> DataGenerator { + DataGenerator { last_used_id: 0 } + } + + pub fn create_u64(&mut self) -> u64 { + self.last_used_id += 1; + self.last_used_id + } + + pub fn create_u256(&mut self) -> U256 { + U256::from(self.create_u64()) + } + + pub fn create_hash(&mut self) -> TxHash { + TxHash::from(self.create_u256()) + } + + pub fn create_tx(&mut self) -> TransactionSignedEcRecovered { + let tx_legacy = TxLegacy { + nonce: self.create_u64(), + ..Default::default() + }; + TransactionSignedEcRecovered::from_signed_transaction( + TransactionSigned { + hash: self.create_hash(), + transaction: Transaction::Legacy(tx_legacy), + ..Default::default() + }, + Address::default(), + ) + } + + pub fn create_order_with_length( + &mut self, + coinbase_profit: U256, + mev_gas_price: U256, + num_of_orders: usize, + ) -> SimulatedOrder { + let mut txs = Vec::new(); + for _ in 0..num_of_orders { + txs.push( + TransactionSignedEcRecoveredWithBlobs::new_no_blobs(self.create_tx()).unwrap(), + ); + } + + let sim_value = SimValue { + coinbase_profit, + mev_gas_price, + ..Default::default() + }; + + let bundle = Bundle { + block: 0, + min_timestamp: None, + max_timestamp: None, + txs, + reverting_tx_hashes: Vec::new(), + hash: B256::ZERO, + uuid: Uuid::new_v4(), + replacement_data: None, + signer: None, + metadata: Metadata::default(), + }; + + SimulatedOrder { + order: Order::Bundle(bundle), + used_state_trace: None, + sim_value, + prev_order: None, + } + } + } + + // Helper function to create an order group + fn create_mock_order_group( + id: GroupId, + orders: Vec, + conflicting_ids: HashSet, + ) -> ConflictGroup { + ConflictGroup { + id, + orders: Arc::new(orders), + conflicting_group_ids: Arc::new(conflicting_ids.into_iter().collect()), + } + } + + fn create_mock_task( + group_idx: usize, + group: ConflictGroup, + algorithm: Algorithm, + priority: TaskPriority, + created_at: Instant, + ) -> ConflictTask { + ConflictTask { + group_idx, + group, + algorithm, + priority, + created_at, + } + } + + #[test] + fn test_all_permutations() { + let mut data_generator = DataGenerator::new(); + let group = create_mock_order_group( + 1, + vec![ + data_generator.create_order_with_length(U256::from(100), U256::from(100), 1), // index: 0, Length 1, profit 100, mev_gas_price 100 + data_generator.create_order_with_length(U256::from(200), U256::from(200), 3), // index: 1, Length 3, profit 200, mev_gas_price 200 + data_generator.create_order_with_length(U256::from(150), U256::from(150), 2), // index: 2, Length 2, profit 150, mev_gas_price 150 + ], + HashSet::default(), + ); + + let task = create_mock_task( + 0, + group, + Algorithm::AllPermutations, + TaskPriority::Low, + Instant::now(), + ); + + let sequences = generate_sequences_of_orders_to_try(&task); + assert_eq!(sequences.len(), 6); + assert_eq!(sequences[0], vec![0, 1, 2]); + assert_eq!(sequences[1], vec![0, 2, 1]); + assert_eq!(sequences[2], vec![1, 0, 2]); + assert_eq!(sequences[3], vec![1, 2, 0]); + assert_eq!(sequences[4], vec![2, 0, 1]); + assert_eq!(sequences[5], vec![2, 1, 0]); + } + + #[test] + fn test_generate_length_based_sequence() { + let mut data_generator = DataGenerator::new(); + + let orders = vec![ + data_generator.create_order_with_length(U256::from(100), U256::from(100), 1), // Length 1, profit 100 + data_generator.create_order_with_length(U256::from(200), U256::from(200), 3), // Length 3, profit 200 + data_generator.create_order_with_length(U256::from(150), U256::from(150), 2), // Length 2, profit 150 + data_generator.create_order_with_length(U256::from(300), U256::from(300), 1), // Length 1, profit 300 + ]; + + let group = create_mock_order_group(1, orders, HashSet::default()); + + let task = create_mock_task( + 0, + group, + Algorithm::Length, + TaskPriority::Low, + Instant::now(), + ); + + let sequences = generate_sequences_of_orders_to_try(&task); + assert_eq!(sequences.len(), 1); + assert_eq!(sequences[0], vec![1, 2, 3, 0]); + } + + #[test] + fn test_max_profit_and_mev_gas_price_sequences() { + let mut data_generator = DataGenerator::new(); + let group = create_mock_order_group( + 1, + vec![ + data_generator.create_order_with_length(U256::from(100), U256::from(300), 1), // index: 0, Length 1, profit 100, mev_gas_price 300 + data_generator.create_order_with_length(U256::from(200), U256::from(150), 3), // index: 1, Length 3, profit 200, mev_gas_price 150 + data_generator.create_order_with_length(U256::from(150), U256::from(200), 2), // index: 2, Length 2, profit 150, mev_gas_price 200 + data_generator.create_order_with_length(U256::from(300), U256::from(100), 1), // index: 3, Length 1, profit 300, mev_gas_price 100 + ], + HashSet::default(), + ); + + let task = create_mock_task( + 0, + group, + Algorithm::Greedy, + TaskPriority::Low, + Instant::now(), + ); + + let sequences = generate_sequences_of_orders_to_try(&task); + assert_eq!(sequences.len(), 2); + + // Coinbase profit is the first + assert_eq!(sequences[0], vec![3, 1, 2, 0]); + // MEV gas price is the second + assert_eq!(sequences[1], vec![0, 2, 1, 3]); + } + + #[test] + fn test_reverse_max_profit_and_mev_gas_price_sequences() { + let mut data_generator = DataGenerator::new(); + let group = create_mock_order_group( + 1, + vec![ + data_generator.create_order_with_length(U256::from(100), U256::from(300), 1), // index: 0, Length 1, profit 100, mev_gas_price 300 + data_generator.create_order_with_length(U256::from(200), U256::from(150), 3), // index: 1, Length 3, profit 200, mev_gas_price 150 + data_generator.create_order_with_length(U256::from(150), U256::from(200), 2), // index: 2, Length 2, profit 150, mev_gas_price 200 + data_generator.create_order_with_length(U256::from(300), U256::from(100), 1), // index: 3, Length 1, profit 300, mev_gas_price 100 + ], + HashSet::default(), + ); + + let task = create_mock_task( + 0, + group, + Algorithm::ReverseGreedy, + TaskPriority::Low, + Instant::now(), + ); + + let sequences = generate_sequences_of_orders_to_try(&task); + assert_eq!(sequences.len(), 2); + + // Coinbase profit is the first + assert_eq!(sequences[0], vec![0, 2, 1, 3]); + // MEV gas price is the second + assert_eq!(sequences[1], vec![3, 1, 2, 0]); + } +} diff --git a/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolving_pool.rs b/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolving_pool.rs new file mode 100644 index 00000000..c4b144eb --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/conflict_resolving_pool.rs @@ -0,0 +1,169 @@ +use alloy_primitives::utils::format_ether; +use crossbeam_queue::SegQueue; +use eyre::Result; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use reth_provider::StateProviderFactory; +use std::sync::mpsc as std_mpsc; +use std::sync::Arc; +use std::time::Instant; +use tokio_util::sync::CancellationToken; +use tracing::{trace, warn}; + +use super::conflict_task_generator::get_tasks_for_group; +use super::ConflictResolutionResultPerGroup; +use super::TaskPriority; +use super::{ + conflict_resolvers::ResolverContext, simulation_cache::SharedSimulationCache, ConflictGroup, + ConflictTask, GroupId, ResolutionResult, +}; +use crate::building::BlockBuildingContext; + +pub type TaskQueue = Arc>; + +pub struct ConflictResolvingPool

{ + task_queue: TaskQueue, + thread_pool: ThreadPool, + group_result_sender: std_mpsc::Sender, + cancellation_token: CancellationToken, + ctx: BlockBuildingContext, + provider: P, + simulation_cache: Arc, +} + +impl

ConflictResolvingPool

+where + P: StateProviderFactory + Clone + 'static, +{ + pub fn new( + num_threads: usize, + task_queue: TaskQueue, + group_result_sender: std_mpsc::Sender, + cancellation_token: CancellationToken, + ctx: BlockBuildingContext, + provider: P, + simulation_cache: Arc, + ) -> Self { + let thread_pool = ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .expect("Failed to build thread pool"); + + Self { + task_queue, + thread_pool, + group_result_sender, + cancellation_token, + ctx, + provider, + simulation_cache, + } + } + + pub fn start(&self) { + let task_queue = self.task_queue.clone(); + let cancellation_token = self.cancellation_token.clone(); + let provider = self.provider.clone(); + let group_result_sender = self.group_result_sender.clone(); + let simulation_cache = self.simulation_cache.clone(); + let ctx = self.ctx.clone(); + + self.thread_pool.spawn(move || { + while !cancellation_token.is_cancelled() { + if let Some(task) = task_queue.pop() { + let task_start = Instant::now(); + if let Ok((task_id, result)) = Self::process_task( + task, + &ctx, + &provider, + cancellation_token.clone(), + Arc::clone(&simulation_cache), + ) { + match group_result_sender.send((task_id, result)) { + Ok(_) => { + trace!( + task_id = %task_id, + time_taken_ms = %task_start.elapsed().as_millis(), + "Conflict resolving: successfully sent group result" + ); + } + Err(err) => { + warn!( + task_id = %task_id, + error = ?err, + time_taken_ms = %task_start.elapsed().as_millis(), + "Conflict resolving: failed to send group result" + ); + } + } + } + } + } + }); + } + + pub fn process_task( + task: ConflictTask, + ctx: &BlockBuildingContext, + provider: &P, + cancellation_token: CancellationToken, + simulation_cache: Arc, + ) -> Result<(GroupId, (ResolutionResult, ConflictGroup))> { + let mut merging_context = ResolverContext::new( + provider.clone(), + ctx.clone(), + cancellation_token, + None, + simulation_cache, + ); + let task_id = task.group_idx; + let task_group = task.group.clone(); + let task_algo = task.algorithm; + + match merging_context.run_conflict_task(task) { + Ok(sequence_of_orders) => { + trace!( + task_type = ?task_algo, + group_id = task_id, + profit = format_ether(sequence_of_orders.total_profit), + order_count = sequence_of_orders.sequence_of_orders.len(), + "Successfully ran conflict task" + ); + Ok((task_id, (sequence_of_orders, task_group))) + } + Err(err) => { + warn!( + "Error running conflict task for group_idx {}: {:?}", + task_id, err + ); + Err(err) + } + } + } + + pub fn process_groups_backtest( + &mut self, + new_groups: Vec, + ctx: &BlockBuildingContext, + provider: &P, + simulation_cache: Arc, + ) -> Vec<(GroupId, (ResolutionResult, ConflictGroup))> { + let mut results = Vec::new(); + for new_group in new_groups { + let tasks = get_tasks_for_group(&new_group, TaskPriority::High); + for task in tasks { + let simulation_cache = Arc::clone(&simulation_cache); + let result = Self::process_task( + task, + ctx, + provider, + CancellationToken::new(), + simulation_cache, + ); + if let Ok(result) = result { + results.push(result); + } + } + } + results + } +} diff --git a/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs b/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs new file mode 100644 index 00000000..d0df9b00 --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/conflict_task_generator.rs @@ -0,0 +1,726 @@ +use crate::primitives::SimulatedOrder; +use ahash::HashMap; +use ahash::HashSet; +use alloy_primitives::{utils::format_ether, U256}; +use crossbeam_queue::SegQueue; +use itertools::Itertools; +use std::time::Instant; +use tracing::{trace, warn}; + +use super::task::ConflictTask; +use super::ConflictGroup; +use super::ConflictResolutionResultPerGroup; +use super::GroupId; +use super::ResolutionResult; +use super::{Algorithm, TaskPriority, TaskQueue}; +use std::sync::mpsc as std_mpsc; + +const THRESHOLD_FOR_SIGNIFICANT_CHANGE: u64 = 20; +const NUMBER_OF_TOP_ORDERS_TO_CONSIDER_FOR_SIGNIFICANT_CHANGE: usize = 10; +const MAX_LENGTH_FOR_ALL_PERMUTATIONS: usize = 3; +const NUMBER_OF_RANDOM_TASKS: usize = 50; + +/// Manages conflicts and updates for conflict groups, coordinating with a worker pool to process tasks. +pub struct ConflictTaskGenerator { + existing_groups: HashMap, + task_queue: TaskQueue, + group_result_sender: std_mpsc::Sender, +} + +impl ConflictTaskGenerator { + /// Creates a new [ConflictTaskGenerator] instance responsible for generating and managing tasks for conflict groups. + /// + /// # Arguments + /// + /// * `task_queue` - The queue to store the generated tasks. + /// * `group_result_sender` - The sender to send the results of the conflict resolution. + pub fn new( + task_queue: TaskQueue, + group_result_sender: std_mpsc::Sender, + ) -> Self { + Self { + existing_groups: HashMap::default(), + task_queue, + group_result_sender, + } + } + + /// Processes a vector of new conflict groups, updating existing groups or creating new tasks as necessary. + /// + /// # Arguments + /// + /// * `new_groups` - A vector of new [ConflictGroup]s to process. + pub fn process_groups(&mut self, new_groups: Vec) { + let mut sorted_groups = new_groups; + sorted_groups.sort_by(|a, b| b.orders.len().cmp(&a.orders.len())); + + let mut processed_groups = HashSet::default(); + for new_group in sorted_groups { + // If the group has already been processed, skip it + if self.has_been_processed(&new_group.id, &processed_groups) { + continue; + } + + // Process the group + self.process_single_group(new_group.clone()); + + // Add the new group and all its subsets to the processed groups + self.add_processed_groups(&new_group, &mut processed_groups); + + // Remove all subset groups + if new_group.conflicting_group_ids.len() > 0 { + self.remove_conflicting_subset_groups(&new_group); + } + } + } + + /// Adds the group and all the conflicting group subsets to the processed groups + fn add_processed_groups( + &mut self, + new_group: &ConflictGroup, + processed_groups: &mut HashSet, + ) { + // Add all subset groups to the processed groups + let subset_ids: Vec = self + .existing_groups + .keys() + .filter(|&id| new_group.conflicting_group_ids.contains(id)) + .cloned() + .collect(); + for id in subset_ids { + processed_groups.insert(id); + } + + // Add the new group to the processed groups + processed_groups.insert(new_group.id); + } + + /// Checks if a group has been processed + fn has_been_processed(&self, group_id: &GroupId, processed_groups: &HashSet) -> bool { + processed_groups.contains(group_id) + } + + /// Removes the subset groups from the existing groups and cancels the tasks for those groups + fn remove_conflicting_subset_groups(&mut self, superset_group: &ConflictGroup) { + let subset_ids: Vec = self + .existing_groups + .keys() + .filter(|&id| superset_group.conflicting_group_ids.contains(id)) + .cloned() + .collect(); + + trace!("Removing subset groups: {:?}", subset_ids); + for id in subset_ids { + self.existing_groups.remove(&id); + self.cancel_tasks_for_group(id); + } + } + + /// Processes a single order group, determining whether to update an existing group or create new tasks. + /// + /// This method checks if the group has changed since it was last processed. If there are no changes, + /// it skips further processing. For changed groups, it determines the appropriate priority and + /// either updates existing tasks or creates new ones. + /// + /// # Arguments + /// + /// * `new_group` - The new `ConflictGroup` to process. + fn process_single_group(&mut self, new_group: ConflictGroup) { + let group_id = new_group.id; + + let should_process = match self.existing_groups.get(&group_id) { + Some(existing_group) => self.group_has_changed(&new_group, existing_group), + None => true, + }; + + if should_process { + if new_group.orders.len() == 1 { + self.process_single_order_group(group_id, &new_group); + } else { + self.process_multi_order_group(group_id, &new_group); + } + self.existing_groups.insert(group_id, new_group); + } + } + + /// Processes a multi-order group, determining whether to update existing tasks or create new ones and the corresponding priority + /// We give updates higher priority if they are significant changes + /// + /// # Arguments + /// + /// * `group_id` - The ID of the group to process. + /// * `new_group` - The new `ConflictGroup` to process. + fn process_multi_order_group(&mut self, group_id: GroupId, new_group: &ConflictGroup) { + let priority = if let Some(existing_group) = self.existing_groups.get(&group_id) { + if self.has_significant_changes(new_group, existing_group) { + TaskPriority::High + } else { + TaskPriority::Medium + } + } else { + TaskPriority::High + }; + trace!( + "Processing multi order group {group_id} with {} orders, {} profit with priority {:?}", + new_group.orders.len(), + format_ether(self.sum_top_n_profits(&new_group.orders, new_group.orders.len())), + priority.display() + ); + if self.existing_groups.contains_key(&group_id) { + self.update_tasks(group_id, new_group, priority); + } else { + self.create_new_tasks(new_group, priority); + } + } + + /// Processes a single order group, sending the result to the worker pool. + /// + /// # Arguments + /// + /// * `group_id` - The ID of the group to process. + /// * `group` - The `ConflictGroup` to process. + fn process_single_order_group(&mut self, group_id: GroupId, group: &ConflictGroup) { + let sequence_of_orders = ResolutionResult { + total_profit: group.orders[0].sim_value.coinbase_profit, + sequence_of_orders: vec![(0, group.orders[0].sim_value.coinbase_profit)], + }; + if let Err(e) = self + .group_result_sender + .send((group_id, (sequence_of_orders, group.clone()))) + { + warn!( + "Failed to send single order result for group {}: {:?}", + group_id, e + ); + } + } + + /// Determines if there are any changes between a new group and an existing group. + /// + /// This method compares the number of orders and each individual order in both groups + /// to detect any changes. + /// + /// # Arguments + /// + /// * `new_group` - The new `ConflictGroup` to compare. + /// * `existing_group` - The existing `ConflictGroup` to compare against. + /// + /// # Returns + /// + /// `true` if there are any changes between the groups, `false` otherwise. + fn group_has_changed(&self, new_group: &ConflictGroup, existing_group: &ConflictGroup) -> bool { + // Compare the number of orders + if new_group.orders.len() != existing_group.orders.len() { + return true; + } + + // Compare each order in the groups + for (new_order, existing_order) in new_group.orders.iter().zip(existing_group.orders.iter()) + { + if new_order != existing_order { + return true; + } + } + + false + } + + /// Determines if there are significant changes between a new group and an existing group. + /// + /// # Arguments + /// + /// * `new_group` - The new `ConflictGroup` to compare. + /// * `existing_group` - The existing `ConflictGroup` to compare against. + /// + /// # Returns + /// + /// `true` if there are significant changes, `false` otherwise. + fn has_significant_changes( + &self, + new_group: &ConflictGroup, + existing_group: &ConflictGroup, + ) -> bool { + let new_top_profit = self.sum_top_n_profits( + &new_group.orders, + NUMBER_OF_TOP_ORDERS_TO_CONSIDER_FOR_SIGNIFICANT_CHANGE, + ); + let existing_top_profit = self.sum_top_n_profits( + &existing_group.orders, + NUMBER_OF_TOP_ORDERS_TO_CONSIDER_FOR_SIGNIFICANT_CHANGE, + ); + + if new_top_profit == existing_top_profit { + return false; + } + + if new_top_profit > existing_top_profit { + // if new_top_profit is more than 20% higher than existing_top_profit, we consider it a significant change + let threshold = existing_top_profit * U256::from(THRESHOLD_FOR_SIGNIFICANT_CHANGE) + / U256::from(100); + if new_top_profit - existing_top_profit > threshold { + return true; + } + } + false + } + + /// Calculates the sum of the top N profits from a slice of simulated orders. + /// + /// # Arguments + /// + /// * `orders` - A slice of `SimulatedOrder`s to process. + /// * `n` - The number of top profits to sum. + /// + /// # Returns + /// + /// The sum of the top N profits as a `U256`. + fn sum_top_n_profits(&self, orders: &[SimulatedOrder], n: usize) -> U256 { + orders + .iter() + .map(|o| o.sim_value.coinbase_profit) + .sorted_by(|a, b| b.cmp(a)) + .take(n) + .sum() + } + + /// Updates tasks for a given group, cancelling existing tasks and creating new ones. + /// + /// # Arguments + /// + /// * `group_id` - The ID of the group to update. + /// * `new_group` - The new `ConflictGroup` to use for task creation. + /// * `priority` - The priority to assign to the new tasks. + fn update_tasks( + &mut self, + group_id: GroupId, + new_group: &ConflictGroup, + priority: TaskPriority, + ) { + trace!( + "Updating tasks for group {} with priority {:?}", + group_id, + priority.display() + ); + // Cancel existing tasks for this grou + self.cancel_tasks_for_group(group_id); + + // Create new tasks for the updated group + self.create_new_tasks(new_group, priority); + } + + /// Cancels all tasks for a given group + fn cancel_tasks_for_group(&mut self, group_id: GroupId) { + let temp_queue = SegQueue::new(); + + while let Some(task) = self.task_queue.pop() { + if task.group_idx != group_id { + temp_queue.push(task); + } + } + + while let Some(task) = temp_queue.pop() { + self.task_queue.push(task); + } + } + + /// Creates and spawns new tasks for a given order group. + /// + /// # Arguments + /// + /// * `new_group` - The `ConflictGroup` to create tasks for. + /// * `priority` - The priority to assign to the tasks. + fn create_new_tasks(&mut self, new_group: &ConflictGroup, priority: TaskPriority) { + let tasks = get_tasks_for_group(new_group, priority); + for task in tasks { + self.task_queue.push(task); + } + } +} + +/// Generates a vector of conflict tasks for a given order group. +/// +/// # Arguments +/// +/// * `group` - The `ConflictGroup` to create tasks for. +/// * `priority` - The priority to assign to the tasks. +/// +/// # Returns +/// +/// A vector of `ConflictTask`s for the given group. +pub fn get_tasks_for_group(group: &ConflictGroup, priority: TaskPriority) -> Vec { + let mut tasks = vec![]; + + let created_at = Instant::now(); + // We want to run Greedy first so we can get quick, decent results + tasks.push(ConflictTask { + group_idx: group.id, + algorithm: Algorithm::Greedy, + priority, + group: group.clone(), + created_at, + }); + + // Then, we can push lower priority tasks that have a low chance, but a chance, of finding a better result + if group.orders.len() <= MAX_LENGTH_FOR_ALL_PERMUTATIONS { + // AllPermutations + tasks.push(ConflictTask { + group_idx: group.id, + algorithm: Algorithm::AllPermutations, + priority, + group: group.clone(), + created_at, + }); + } else { + // Random + tasks.push(ConflictTask { + group_idx: group.id, + algorithm: Algorithm::Random { + seed: group.id as u64, + count: NUMBER_OF_RANDOM_TASKS, + }, + priority: TaskPriority::Low, + group: group.clone(), + created_at, + }); + tasks.push(ConflictTask { + group_idx: group.id, + algorithm: Algorithm::Length, + priority: TaskPriority::Low, + group: group.clone(), + created_at, + }); + tasks.push(ConflictTask { + group_idx: group.id, + algorithm: Algorithm::ReverseGreedy, + priority: TaskPriority::Low, + group: group.clone(), + created_at, + }); + } + + tasks +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::primitives::{ + MempoolTx, Order, SimValue, SimulatedOrder, TransactionSignedEcRecoveredWithBlobs, + }; + use std::sync::Arc; + + use alloy_primitives::{Address, TxHash, B256, U256}; + use reth::primitives::{ + Transaction, TransactionSigned, TransactionSignedEcRecovered, TxLegacy, + }; + + use crate::building::evm_inspector::{SlotKey, UsedStateTrace}; + use std::sync::mpsc; + + struct DataGenerator { + last_used_id: u64, + } + impl DataGenerator { + pub fn new() -> DataGenerator { + DataGenerator { last_used_id: 0 } + } + + pub fn create_u64(&mut self) -> u64 { + self.last_used_id += 1; + self.last_used_id + } + + pub fn create_u256(&mut self) -> U256 { + U256::from(self.create_u64()) + } + + pub fn create_b256(&mut self) -> B256 { + B256::from(self.create_u256()) + } + + pub fn create_hash(&mut self) -> TxHash { + TxHash::from(self.create_u256()) + } + + pub fn create_tx(&mut self) -> TransactionSignedEcRecovered { + TransactionSignedEcRecovered::from_signed_transaction( + TransactionSigned { + hash: self.create_hash(), + transaction: Transaction::Legacy(TxLegacy::default()), + ..Default::default() + }, + Address::default(), + ) + } + + pub fn create_order( + &mut self, + read: Option<&SlotKey>, + write: Option<&SlotKey>, + coinbase_profit: U256, + ) -> SimulatedOrder { + let mut trace = UsedStateTrace::default(); + if let Some(read) = read { + trace + .read_slot_values + .insert(read.clone(), self.create_b256()); + } + if let Some(write) = write { + trace + .written_slot_values + .insert(write.clone(), self.create_b256()); + } + + let sim_value = SimValue { + coinbase_profit, + ..Default::default() + }; + + SimulatedOrder { + order: Order::Tx(MempoolTx { + tx_with_blobs: TransactionSignedEcRecoveredWithBlobs::new_no_blobs( + self.create_tx(), + ) + .unwrap(), + }), + used_state_trace: Some(trace), + sim_value, + prev_order: None, + } + } + } + + // Helper function to create an order group + fn create_conflict_group( + id: GroupId, + orders: Vec, + conflicting_ids: HashSet, + ) -> ConflictGroup { + ConflictGroup { + id, + orders: Arc::new(orders), + conflicting_group_ids: Arc::new(conflicting_ids.into_iter().collect()), + } + } + + fn create_task_queue() -> Arc> { + Arc::new(SegQueue::new()) + } + + fn create_task_generator() -> ConflictTaskGenerator { + let (sender, _receiver) = mpsc::channel(); + ConflictTaskGenerator::new(create_task_queue(), sender) + } + + #[test] + fn test_process_single_group_new() { + let mut conflict_manager = create_task_generator(); + + let mut data_generator = DataGenerator::new(); + let group = create_conflict_group( + 1, + vec![ + data_generator.create_order(None, None, U256::from(100)), + data_generator.create_order(None, None, U256::from(200)), + ], + HashSet::default(), + ); + conflict_manager.process_single_group(group.clone()); + + assert_eq!(conflict_manager.existing_groups.len(), 1); + assert!(conflict_manager.existing_groups.contains_key(&1)); + assert_eq!(conflict_manager.task_queue.len(), 2); + } + + #[test] + fn test_process_single_group_update() { + let mut conflict_manager = create_task_generator(); + + let mut data_generator = DataGenerator::new(); + + let initial_group = create_conflict_group( + 1, + vec![ + data_generator.create_order(None, None, U256::from(100)), + data_generator.create_order(None, None, U256::from(200)), + ], + HashSet::default(), + ); + conflict_manager.existing_groups.insert(1, initial_group); + + let updated_group = create_conflict_group( + 1, + vec![ + data_generator.create_order(None, None, U256::from(200)), + data_generator.create_order(None, None, U256::from(300)), + ], + HashSet::default(), + ); + conflict_manager.process_single_group(updated_group); + + assert_eq!(conflict_manager.existing_groups.len(), 1); + assert_eq!(conflict_manager.task_queue.len(), 2); + } + + #[test] + fn test_conflicting_groups_remove_tasks() { + let mut conflict_manager = create_task_generator(); + + let mut data_generator = DataGenerator::new(); + + let mut group2_conflicts_with_group1 = HashSet::default(); + group2_conflicts_with_group1.insert(1 as usize); + + let group1 = create_conflict_group( + 1, + vec![ + data_generator.create_order(None, None, U256::from(100)), + data_generator.create_order(None, None, U256::from(200)), + ], + HashSet::default(), + ); + let group2 = create_conflict_group( + 2, + vec![ + data_generator.create_order(None, None, U256::from(200)), + data_generator.create_order(None, None, U256::from(300)), + ], + group2_conflicts_with_group1, + ); + + conflict_manager.process_groups(vec![group1, group2]); + + assert_eq!(conflict_manager.existing_groups.len(), 1); + assert!(conflict_manager.existing_groups.contains_key(&2)); + assert_eq!(conflict_manager.task_queue.len(), 2); + } + + #[test] + fn test_process_groups() { + let mut conflict_manager = create_task_generator(); + + let mut data_generator = DataGenerator::new(); + + let mut group2_conflicts_with_group1 = HashSet::default(); + group2_conflicts_with_group1.insert(1 as usize); + + let groups = vec![ + create_conflict_group( + 1, + vec![ + data_generator.create_order(None, None, U256::from(100)), + data_generator.create_order(None, None, U256::from(200)), + ], + HashSet::default(), + ), + create_conflict_group( + 2, + vec![ + data_generator.create_order(None, None, U256::from(200)), + data_generator.create_order(None, None, U256::from(300)), + ], + group2_conflicts_with_group1, + ), + create_conflict_group( + 3, + vec![ + data_generator.create_order(None, None, U256::from(300)), + data_generator.create_order(None, None, U256::from(400)), + ], + HashSet::default(), + ), + ]; + + conflict_manager.process_groups(groups); + + assert_eq!(conflict_manager.existing_groups.len(), 2); + assert!(conflict_manager.existing_groups.contains_key(&2)); + assert!(conflict_manager.existing_groups.contains_key(&3)); + assert_eq!(conflict_manager.task_queue.len(), 4); + } + + #[test] + fn test_has_significant_changes() { + let conflict_manager = create_task_generator(); + + let mut data_generator = DataGenerator::new(); + + let group1 = create_conflict_group( + 1, + vec![ + data_generator.create_order(None, None, U256::from(100)), + data_generator.create_order(None, None, U256::from(90)), + data_generator.create_order(None, None, U256::from(80)), + ], + HashSet::default(), + ); + + let group2 = create_conflict_group( + 1, + vec![ + data_generator.create_order(None, None, U256::from(150)), + data_generator.create_order(None, None, U256::from(140)), + data_generator.create_order(None, None, U256::from(130)), + ], + HashSet::default(), + ); + + assert!(conflict_manager.has_significant_changes(&group2, &group1)); + + let group3 = create_conflict_group( + 1, + vec![ + data_generator.create_order(None, None, U256::from(110)), + data_generator.create_order(None, None, U256::from(100)), + data_generator.create_order(None, None, U256::from(90)), + ], + HashSet::default(), + ); + + assert!(!conflict_manager.has_significant_changes(&group3, &group1)); + } + + #[test] + fn has_group_changed() { + let conflict_manager = create_task_generator(); + + let mut data_generator = DataGenerator::new(); + + let order1 = data_generator.create_order(None, None, U256::from(100)); + + let group1 = create_conflict_group(1, vec![order1.clone()], HashSet::default()); + + let group2 = create_conflict_group( + 1, + vec![data_generator.create_order(None, None, U256::from(200))], + HashSet::default(), + ); + + assert!(conflict_manager.group_has_changed(&group2, &group1)); + + let group3 = create_conflict_group(1, vec![order1], HashSet::default()); + + assert!(!conflict_manager.group_has_changed(&group3, &group1)); + } + + #[test] + fn test_single_order_group() { + let mut conflict_manager = create_task_generator(); + + let mut data_generator = DataGenerator::new(); + + let group = create_conflict_group( + 1, + vec![data_generator.create_order(None, None, U256::from(100))], + HashSet::default(), + ); + + conflict_manager.process_single_group(group); + + assert_eq!(conflict_manager.existing_groups.len(), 1); + assert!(conflict_manager.existing_groups.contains_key(&1)); + + // Should not create new tasks + assert_eq!(conflict_manager.task_queue.len(), 0); + } +} diff --git a/crates/rbuilder/src/building/builders/merging_builder/groups.rs b/crates/rbuilder/src/building/builders/parallel_builder/groups.rs similarity index 85% rename from crates/rbuilder/src/building/builders/merging_builder/groups.rs rename to crates/rbuilder/src/building/builders/parallel_builder/groups.rs index 43f7e0e3..72518e82 100644 --- a/crates/rbuilder/src/building/builders/merging_builder/groups.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/groups.rs @@ -6,23 +6,24 @@ use ahash::{HashMap, HashSet}; use alloy_primitives::U256; use itertools::Itertools; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; -/// GroupOrdering describes order of certain groups of orders. -#[derive(Debug, Clone)] -pub struct GroupOrdering { +/// ResolutionResult describes order of certain groups of orders. +#[derive(Debug, Default, Clone)] +pub struct ResolutionResult { /// Total coinbase profit of the given ordering. pub total_profit: U256, - /// Orders of the group (index of order in the group, profit ot the given order) - pub orders: Vec<(usize, U256)>, + /// Sequence of orders and their profit in that sequence + pub sequence_of_orders: Vec<(usize, U256)>, } -/// OrderGroups describes set of conflicting orders. +/// ConflictGroups describes set of conflicting orders. /// It's meant to be shared between thread who merges the group and who uses the best ordering to combine the result. #[derive(Debug, Clone)] -pub struct OrderGroup { +pub struct ConflictGroup { + pub id: usize, pub orders: Arc>, - pub best_ordering: Arc>, // idx, profit + pub conflicting_group_ids: Arc>, } #[derive(Debug, Default)] @@ -30,11 +31,12 @@ struct GroupData { orders: Vec, reads: Vec, writes: Vec, + conflicting_group_ids: HashSet, } -/// CachedGroups is used to quickly update set of groups when new orders arrive +/// ConflictFinder is used to quickly find and update groups of orders that conflict with each other. #[derive(Debug)] -pub struct CachedGroups { +pub struct ConflictFinder { group_counter: usize, group_reads: HashMap>, group_writes: HashMap>, @@ -42,9 +44,9 @@ pub struct CachedGroups { orders: HashSet, } -impl CachedGroups { +impl ConflictFinder { pub fn new() -> Self { - CachedGroups { + ConflictFinder { group_counter: 0, group_reads: HashMap::default(), group_writes: HashMap::default(), @@ -90,6 +92,7 @@ impl CachedGroups { orders: vec![order], reads: used_state.read_slot_values.keys().cloned().collect(), writes: used_state.written_slot_values.keys().cloned().collect(), + conflicting_group_ids: HashSet::default(), }; for read in &group_data.reads { self.group_reads @@ -139,6 +142,18 @@ impl CachedGroups { .into_iter() .map(|group_id| (group_id, self.groups.remove(&group_id).unwrap())) .collect::>(); + + // Collect all conflicting group IDs + let merged_conflicting_ids = conflicting_groups + .iter() + .flat_map(|(gid, gd)| { + gd.conflicting_group_ids + .iter() + .cloned() + .chain(std::iter::once(*gid)) + }) + .collect::>(); + for (group_id, group_data) in &conflicting_groups { for read in &group_data.reads { let group_reads_slot = @@ -161,15 +176,20 @@ impl CachedGroups { let group_id = self.group_counter; self.group_counter += 1; + let mut group_data = GroupData { orders: vec![order], reads: used_state.read_slot_values.keys().cloned().collect(), writes: used_state.written_slot_values.keys().cloned().collect(), + conflicting_group_ids: merged_conflicting_ids, }; for (_, mut group) in conflicting_groups { group_data.orders.append(&mut group.orders); group_data.reads.append(&mut group.reads); group_data.writes.append(&mut group.writes); + group_data + .conflicting_group_ids + .extend(group.conflicting_group_ids); } group_data.reads.sort(); group_data.reads.dedup(); @@ -193,48 +213,25 @@ impl CachedGroups { } } - pub fn get_order_groups(&self) -> Vec { - let groups = self - .groups + pub fn get_order_groups(&self) -> Vec { + self.groups .iter() .sorted_by_key(|(idx, _)| *idx) - .map(|(_, group_data)| { - if group_data.orders.len() == 1 { - let order_profit = group_data.orders[0].sim_value.coinbase_profit; - OrderGroup { - orders: Arc::new(group_data.orders.clone()), - best_ordering: Arc::new(Mutex::new(GroupOrdering { - total_profit: order_profit, - orders: vec![(0, order_profit)], - })), - } - } else { - OrderGroup { - orders: Arc::new(group_data.orders.clone()), - best_ordering: Arc::new(Mutex::new(GroupOrdering { - total_profit: U256::ZERO, - orders: Vec::new(), - })), - } - } + .map(|(group_id, group_data)| ConflictGroup { + id: *group_id, + orders: Arc::new(group_data.orders.clone()), + conflicting_group_ids: Arc::new(group_data.conflicting_group_ids.clone()), }) - .collect::>(); - groups + .collect() } } -impl Default for CachedGroups { +impl Default for ConflictFinder { fn default() -> Self { Self::new() } } -pub fn split_orders_into_groups(orders: Vec) -> Vec { - let mut cached_groups = CachedGroups::new(); - cached_groups.add_orders(orders); - cached_groups.get_order_groups() -} - #[cfg(test)] mod tests { use alloy_primitives::{Address, TxHash, B256, U256}; @@ -249,7 +246,7 @@ mod tests { }, }; - use super::CachedGroups; + use super::ConflictFinder; struct DataGenerator { last_used_id: u64, @@ -332,7 +329,7 @@ mod tests { let oa = data_gen.create_order(None, Some(&slot)); let ob = data_gen.create_order(None, Some(&slot)); let oc = data_gen.create_order(Some(&slot), None); - let mut cached_groups = CachedGroups::new(); + let mut cached_groups = ConflictFinder::new(); cached_groups.add_orders(vec![oa, ob, oc]); let groups = cached_groups.get_order_groups(); assert_eq!(groups.len(), 1); @@ -344,7 +341,7 @@ mod tests { let slot = data_gen.create_slot(); let oa = data_gen.create_order(Some(&slot), None); let ob = data_gen.create_order(Some(&slot), None); - let mut cached_groups = CachedGroups::new(); + let mut cached_groups = ConflictFinder::new(); cached_groups.add_orders(vec![oa, ob]); let groups = cached_groups.get_order_groups(); assert_eq!(groups.len(), 2); @@ -356,7 +353,7 @@ mod tests { let slot = data_gen.create_slot(); let oa = data_gen.create_order(None, Some(&slot)); let ob = data_gen.create_order(None, Some(&slot)); - let mut cached_groups = CachedGroups::new(); + let mut cached_groups = ConflictFinder::new(); cached_groups.add_orders(vec![oa, ob]); let groups = cached_groups.get_order_groups(); assert_eq!(groups.len(), 2); diff --git a/crates/rbuilder/src/building/builders/parallel_builder/mod.rs b/crates/rbuilder/src/building/builders/parallel_builder/mod.rs new file mode 100644 index 00000000..49546496 --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/mod.rs @@ -0,0 +1,427 @@ +pub mod block_building_result_assembler; +pub mod conflict_resolvers; +pub mod conflict_resolving_pool; +pub mod conflict_task_generator; +pub mod groups; +pub mod order_intake_store; +pub mod results_aggregator; +pub mod simulation_cache; +pub mod task; +pub use groups::*; + +use ahash::HashMap; +use conflict_resolving_pool::{ConflictResolvingPool, TaskQueue}; +use conflict_task_generator::ConflictTaskGenerator; +use crossbeam::queue::SegQueue; +use eyre::Result; +use itertools::Itertools; +use results_aggregator::BestResults; +use serde::Deserialize; +use simulation_cache::SharedSimulationCache; +use std::sync::mpsc as std_mpsc; +use std::thread; +use std::{sync::Arc, time::Instant}; +use task::*; +use time::OffsetDateTime; +use tokio_util::sync::CancellationToken; +use tracing::{error, trace}; + +use crate::{ + building::builders::{ + BacktestSimulateBlockInput, Block, BlockBuildingAlgorithm, BlockBuildingAlgorithmInput, + LiveBuilderInput, + }, + roothash::RootHashConfig, +}; +use alloy_primitives::Address; +use reth::tasks::pool::BlockingTaskPool; +use reth_db::database::Database; +use reth_payload_builder::database::CachedReads; +use reth_provider::{DatabaseProviderFactory, StateProviderFactory}; + +use self::{ + block_building_result_assembler::BlockBuildingResultAssembler, + order_intake_store::OrderIntakeStore, results_aggregator::ResultsAggregator, +}; + +pub type GroupId = usize; +pub type ConflictResolutionResultPerGroup = (GroupId, (ResolutionResult, ConflictGroup)); + +/// ParallelBuilderConfig configures parallel builder. +/// * `num_threads` - number of threads to use for merging. +/// * `merge_wait_time_ms` - time to wait for merging to finish before consuming new orders. +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +pub struct ParallelBuilderConfig { + pub discard_txs: bool, + pub num_threads: usize, + #[serde(default)] + pub coinbase_payment: bool, +} + +fn get_communication_channels() -> ( + std_mpsc::Sender, + std_mpsc::Receiver, +) { + std_mpsc::channel() +} + +fn get_shared_data_structures() -> (Arc, TaskQueue) { + let best_results = Arc::new(BestResults::new()); + let task_queue = Arc::new(SegQueue::new()); + (best_results, task_queue) +} + +struct ParallelBuilder { + order_intake_consumer: OrderIntakeStore, + conflict_finder: ConflictFinder, + conflict_task_generator: ConflictTaskGenerator, + conflict_resolving_pool: ConflictResolvingPool

, + results_aggregator: ResultsAggregator, + block_building_result_assembler: BlockBuildingResultAssembler, +} + +impl ParallelBuilder +where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, +{ + /// Creates a ParallelBuilder. + /// Sets up the various components and communication channels. + pub fn new(input: LiveBuilderInput, config: &ParallelBuilderConfig) -> Self { + let (group_result_sender, group_result_receiver) = get_communication_channels(); + let group_result_sender_for_task_generator = group_result_sender.clone(); + + let (best_results, task_queue) = get_shared_data_structures(); + + let simulation_cache = Arc::new(SharedSimulationCache::new()); + + let conflict_finder = ConflictFinder::new(); + + let conflict_task_generator = ConflictTaskGenerator::new( + Arc::clone(&task_queue), + group_result_sender_for_task_generator, + ); + + let conflict_resolving_pool = ConflictResolvingPool::new( + config.num_threads, + Arc::clone(&task_queue), + group_result_sender, + input.cancel.clone(), + input.ctx.clone(), + input.provider.clone(), + Arc::clone(&simulation_cache), + ); + + let results_aggregator = + ResultsAggregator::new(group_result_receiver, Arc::clone(&best_results)); + + let block_building_result_assembler = BlockBuildingResultAssembler::new( + config, + input.root_hash_config, + Arc::clone(&best_results), + input.provider.clone(), + input.root_hash_task_pool.clone(), + input.ctx.clone(), + input.cancel.clone(), + input.builder_name.clone(), + input.sink.can_use_suggested_fee_recipient_as_coinbase(), + Some(input.sink.clone()), + ); + + let order_intake_consumer = + OrderIntakeStore::new(input.input, &input.sbundle_mergeabe_signers); + + Self { + order_intake_consumer, + conflict_finder, + conflict_task_generator, + conflict_resolving_pool, + results_aggregator, + block_building_result_assembler, + } + } + + /// Initializes the orders in the cached groups. + fn initialize_orders(&mut self) { + let initial_orders = self.order_intake_consumer.get_orders(); + trace!("Initializing with {} orders", initial_orders.len()); + self.conflict_finder.add_orders(initial_orders); + } +} + +/// Runs the parallel builder algorithm to construct blocks from incoming orders. +/// +/// This function implements a continuous block building process that: +/// 1. Consumes orders from an intake store. +/// 2. Identifies conflict groups among the orders. +/// 3. Manages conflicts and attempts to resolve them. +/// 4. Builds blocks from the best results of conflict resolution. +/// +/// The process involves several key components: +/// - [OrderIntakeStore]: Provides a continuous stream of incoming orders. +/// - [ConflictFinder]: Identifies and manages conflict groups among orders. +/// - [ConflictTaskGenerator]: Decides which conflicts to attempt to resolve and what priority. +/// - [ConflictResolvingPool]: A pool of workers that resolve conflicts between orders, producing "results" which are resolved conflicts. +/// - [ResultsAggregator]: Collects results from workers and initiates block building. +/// - [BlockBuildingResultAssembler]: Builds blocks from the best results collected. +/// +/// The function runs in a loop, continuously processing new orders and building blocks +/// until cancellation is requested. It uses separate processes for +/// 1. Identifying conflicts and processing which conflicts to attempt to resolve in what priority +/// 2. Resolving conflicts +/// 3. Block building given conflict resolution results +/// +/// By separating these processes we can continuously take in new flow, triage that flow intelligently, and build blocks continuously with the best results. +/// +/// # Arguments +/// * `input`: LiveBuilderInput containing necessary context and resources for block building. +/// * `config`: Configuration parameters for the parallel builder. +/// +/// # Type Parameters +/// * `DB`: The database type, which must implement Database, Clone, and have a static lifetime. +pub fn run_parallel_builder(input: LiveBuilderInput, config: &ParallelBuilderConfig) +where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, +{ + let cancel_for_results_aggregator = input.cancel.clone(); + let cancel_for_block_building_result_assembler = input.cancel.clone(); + let cancel_for_process_orders_loop = input.cancel.clone(); + + let mut builder = ParallelBuilder::new(input, config); + builder.initialize_orders(); + + // Start task processing + thread::spawn(move || { + builder.conflict_resolving_pool.start(); + }); + + // Process that collects conflict resolution results from workers and triggers block building + tokio::spawn(async move { + builder + .results_aggregator + .run(cancel_for_results_aggregator) + .await; + }); + + // Process that builds blocks from the best conflict resolution results + thread::spawn(move || { + builder + .block_building_result_assembler + .run(cancel_for_block_building_result_assembler); + }); + + // Process that consumes orders from the intake store, updates the cached groups, and triggers new conflict resolution tasks for the worker pool + run_order_intake( + &cancel_for_process_orders_loop, + &mut builder.order_intake_consumer, + &mut builder.conflict_finder, + &mut builder.conflict_task_generator, + ); +} + +fn run_order_intake( + cancel_token: &CancellationToken, + order_intake_consumer: &mut OrderIntakeStore, + conflict_finder: &mut ConflictFinder, + conflict_task_generator: &mut ConflictTaskGenerator, +) { + 'building: loop { + if cancel_token.is_cancelled() { + break 'building; + } + + match order_intake_consumer.consume_next_batch() { + Ok(ok) => { + if !ok { + break 'building; + } + } + Err(err) => { + error!(?err, "Error consuming next order batch"); + continue; + } + } + + let new_orders = order_intake_consumer.drain_new_orders(); + + // We can update conflict_finder if we have ONLY adds + if let Some(new_orders) = new_orders { + if !new_orders.is_empty() { + let time_start = Instant::now(); + let len = new_orders.len(); + conflict_finder.add_orders(new_orders); + trace!( + new_orders_count = len, + groups_count = conflict_finder.get_order_groups().len(), + time_taken_ms = %time_start.elapsed().as_millis(), + "Order intake: added new orders and processing groups" + ); + conflict_task_generator.process_groups(conflict_finder.get_order_groups()); + } + } + } +} + +pub fn parallel_build_backtest( + input: BacktestSimulateBlockInput<'_, P>, + config: ParallelBuilderConfig, +) -> Result<(Block, CachedReads)> +where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, +{ + let start_time = Instant::now(); + + // Initialization stage + let init_start = Instant::now(); + let (best_results, task_queue) = get_shared_data_structures(); + + let (group_result_sender, _) = get_communication_channels(); + + let mut conflict_finder = ConflictFinder::new(); + + let sorted_orders = { + let mut orders = input.sim_orders.clone(); + orders.sort_by_key(|o| o.order.id()); + orders + }; + + conflict_finder.add_orders(sorted_orders); + let simulation_cache = Arc::new(SharedSimulationCache::new()); + let init_duration = init_start.elapsed(); + + // Worker pool and conflict manager creation + let setup_start = Instant::now(); + + let mut conflict_resolving_pool = ConflictResolvingPool::new( + config.num_threads, + Arc::clone(&task_queue), + group_result_sender, + CancellationToken::new(), + input.ctx.clone(), + input.provider.clone(), + Arc::clone(&simulation_cache), + ); + + let setup_duration = setup_start.elapsed(); + + // Group processing + let processing_start = Instant::now(); + let groups = conflict_finder.get_order_groups(); + let results = conflict_resolving_pool.process_groups_backtest( + groups, + &input.ctx, + &input.provider, + Arc::clone(&simulation_cache), + ); + let processing_duration = processing_start.elapsed(); + + // Block building result assembler creation + let assembler_start = Instant::now(); + let block_building_result_assembler = BlockBuildingResultAssembler::new( + &config, + RootHashConfig::skip_root_hash(), + Arc::clone(&best_results), + input.provider.clone(), + BlockingTaskPool::build()?, + input.ctx.clone(), + CancellationToken::new(), + String::from("backtest_builder"), + true, + None, + ); + let assembler_duration = assembler_start.elapsed(); + + // Best results collection + let collection_start = Instant::now(); + let best_results: HashMap = results + .into_iter() + .sorted_by(|a, b| b.1 .0.total_profit.cmp(&a.1 .0.total_profit)) + .into_group_map_by(|(group_id, _)| *group_id) + .into_iter() + .map(|(group_id, mut group_results)| (group_id, group_results.remove(0).1)) + .collect(); + let collection_duration = collection_start.elapsed(); + + // Block building + let building_start = Instant::now(); + let block_building_helper = block_building_result_assembler + .build_backtest_block(best_results, OffsetDateTime::now_utc())?; + + let payout_tx_value = if config.coinbase_payment { + None + } else { + Some(block_building_helper.true_block_value()?) + }; + let finalize_block_result = block_building_helper.finalize_block(payout_tx_value)?; + let building_duration = building_start.elapsed(); + let total_duration = start_time.elapsed(); + + trace!("Initialization time: {:?}", init_duration); + trace!("Setup time: {:?}", setup_duration); + trace!("Group processing time: {:?}", processing_duration); + trace!("Assembler creation time: {:?}", assembler_duration); + trace!("Best results collection time: {:?}", collection_duration); + trace!("Block building time: {:?}", building_duration); + trace!("Total time taken: {:?}", total_duration); + + Ok(( + finalize_block_result.block, + finalize_block_result.cached_reads, + )) +} + +#[derive(Debug)] +pub struct ParallelBuildingAlgorithm { + root_hash_config: RootHashConfig, + root_hash_task_pool: BlockingTaskPool, + sbundle_mergeabe_signers: Vec

, + config: ParallelBuilderConfig, + name: String, +} + +impl ParallelBuildingAlgorithm { + pub fn new( + root_hash_config: RootHashConfig, + root_hash_task_pool: BlockingTaskPool, + sbundle_mergeabe_signers: Vec
, + config: ParallelBuilderConfig, + name: String, + ) -> Self { + Self { + root_hash_config, + root_hash_task_pool, + sbundle_mergeabe_signers, + config, + name, + } + } +} + +impl BlockBuildingAlgorithm for ParallelBuildingAlgorithm +where + DB: Database + Clone + 'static, + P: DatabaseProviderFactory + StateProviderFactory + Clone + 'static, +{ + fn name(&self) -> String { + self.name.clone() + } + + fn build_blocks(&self, input: BlockBuildingAlgorithmInput

) { + let live_input = LiveBuilderInput { + provider: input.provider, + root_hash_config: self.root_hash_config.clone(), + root_hash_task_pool: self.root_hash_task_pool.clone(), + ctx: input.ctx.clone(), + input: input.input, + sink: input.sink, + builder_name: self.name.clone(), + cancel: input.cancel, + sbundle_mergeabe_signers: self.sbundle_mergeabe_signers.clone(), + phantom: Default::default(), + }; + run_parallel_builder(live_input, &self.config); + } +} diff --git a/crates/rbuilder/src/building/builders/merging_builder/order_intake_store.rs b/crates/rbuilder/src/building/builders/parallel_builder/order_intake_store.rs similarity index 97% rename from crates/rbuilder/src/building/builders/merging_builder/order_intake_store.rs rename to crates/rbuilder/src/building/builders/parallel_builder/order_intake_store.rs index f0972eb9..61f459d3 100644 --- a/crates/rbuilder/src/building/builders/merging_builder/order_intake_store.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/order_intake_store.rs @@ -12,7 +12,7 @@ use crate::{ primitives::SimulatedOrder, }; -/// Struct that allow getting the new orders from the order/cancellation stream in the way the merging builder likes it. +/// Struct that allow getting the new orders from the order/cancellation stream in the way the parallel builder likes it. /// Contains the current whole set of orders but also can be queried for deltas on the orders ONLY if the deltas are all additions /// Chains MultiShareBundleMerger->SimulatedOrderStore /// Usage: diff --git a/crates/rbuilder/src/building/builders/parallel_builder/readme.md b/crates/rbuilder/src/building/builders/parallel_builder/readme.md new file mode 100644 index 00000000..18351d8c --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/readme.md @@ -0,0 +1,26 @@ +# Parallel Builder +The parallel builder is a block building algorithm that runs key components of building in parallel and attempts to do more sophisticated merging of bundles. + +It's primary differentiator is that it identifies groups of conflicting orders and resolves them independently of each other and in parallel. By doing so, we can pipeline the stages of orderflow intake, conflict resolution, and building a final block. + +## Components and Process Flow +1. The **[OrderIntakeStore](order_intake_store.rs)** consumes orders from the intake store. +2. The **[ConflictFinder](groups.rs)** identifies conflict groups among the orders. +3. The **[ConflictTaskGenerator](task.rs)** creates tasks for resolving conflicts. +4. The **[ConflictResolvingPool](conflict_resolving_pool.rs)** is a pool of threads that process these tasks in parallel, executing merging algorithms defined by tasks. +5. The **[ResultsAggregator](results_aggregator.rs)** collects the results of conflict resolution, keeping track of the best results. +6. The **[BlockBuildingResultAssembler](block_building_result_assembler.rs)** constructs blocks from the best results obtained from the `ResultsAggregator`. + +## Usage live +The parallel builder requires extra configuration options to be used live. Here is an example for your config file: + +``` +[[builders]] +name = "parallel" +algo = "parallel-builder" +discard_txs = true +num_threads = 25 +``` + +## Backtesting +The parallel builder can be backtested. However, it is quite slow due to how it is currently implemented. This isn't reflective of the latency performance in the live environment and could be improved with more work. \ No newline at end of file diff --git a/crates/rbuilder/src/building/builders/parallel_builder/results_aggregator.rs b/crates/rbuilder/src/building/builders/parallel_builder/results_aggregator.rs new file mode 100644 index 00000000..c0d8a0f6 --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/results_aggregator.rs @@ -0,0 +1,536 @@ +use super::{ConflictGroup, GroupId, ResolutionResult}; +use alloy_primitives::utils::format_ether; +use alloy_primitives::U256; +use dashmap::DashMap; +use std::sync::mpsc as std_mpsc; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; +use std::time::{Duration, Instant}; +use tokio_util::sync::CancellationToken; +use tracing::trace; + +pub struct BestResults { + pub data: DashMap, + pub version: AtomicU64, +} + +impl BestResults { + pub fn new() -> Self { + Self { + data: DashMap::new(), + version: AtomicU64::new(0), + } + } + + pub fn increment_version(&self) { + self.version.fetch_add(1, Ordering::SeqCst); + } + + pub fn get_results_and_version(&self) -> (Vec<(ResolutionResult, ConflictGroup)>, u64) { + let results = self + .data + .iter() + .map(|entry| { + let (_, (sequence_of_orders, order_group)) = entry.pair(); + (sequence_of_orders.clone(), order_group.clone()) + }) + .collect(); + (results, self.version.load(Ordering::SeqCst)) + } + + pub fn get_number_of_orders(&self) -> usize { + self.data + .iter() + .map(|entry| entry.value().1.orders.len()) + .sum() + } + + pub fn get_version(&self) -> u64 { + self.version.load(Ordering::SeqCst) + } +} + +impl Default for BestResults { + fn default() -> Self { + Self::new() + } +} + +/// Collects and manages the best results for each group in a concurrent environment. +/// +/// This struct is responsible for receiving group results, updating the best known +/// results for each group, and triggering builds when better results are found. +pub struct ResultsAggregator { + group_result_receiver: std_mpsc::Receiver<(GroupId, (ResolutionResult, ConflictGroup))>, + best_results: Arc, +} + +impl ResultsAggregator { + /// Creates a new `ResultsAggregator` instance. + /// + /// # Arguments + /// + /// * `group_result_receiver` - A receiver for incoming group results. + /// * `best_results` - A shared, thread-safe map to store the best results for each group. + /// * `build_trigger` - A sender to trigger builds when better results are found. + pub fn new( + group_result_receiver: std_mpsc::Receiver<(GroupId, (ResolutionResult, ConflictGroup))>, + best_results: Arc, + ) -> Self { + Self { + group_result_receiver, + best_results, + } + } + + /// Updates the best result for a given group. + /// + /// # Arguments + /// + /// * `group_id` - The ID of the group to update. + /// * `sequence_of_orders` - The new ordering information for the group. + /// * `order_group` - The new order group. + /// + /// # Returns + /// + /// Returns a tuple where: + /// - The first element is a boolean indicating if the best result was updated. + /// - The second element is the old profit before the update. + fn update_best_result( + &self, + group_id: GroupId, + sequence_of_orders: ResolutionResult, + order_group: ConflictGroup, + ) -> (bool, U256) { + let mut best_result_updated = false; + let mut old_profit = U256::ZERO; + + // First, check for conflicting groups that need to be removed + let mut conflicting_groups_to_remove = Vec::new(); + for &conflicting_id in &*order_group.conflicting_group_ids { + if let Some(existing_entry) = self.best_results.data.get(&conflicting_id) { + if sequence_of_orders.total_profit > existing_entry.value().0.total_profit { + conflicting_groups_to_remove.push(conflicting_id); + } else { + // If the new group is not more profitable than an existing conflicting group, + // we don't update anything + return (false, old_profit); + } + } + } + + // Insert or update the new group + { + let mut entry = self.best_results.data.entry(group_id).or_insert_with(|| { + best_result_updated = true; + (sequence_of_orders.clone(), order_group.clone()) + }); + + if sequence_of_orders.total_profit > entry.value().0.total_profit { + old_profit = entry.value().0.total_profit; + *entry.value_mut() = (sequence_of_orders, order_group); + best_result_updated = true; + } + } + + // Remove all conflicting groups + for conflicting_id in &conflicting_groups_to_remove { + self.best_results.data.remove(conflicting_id); + } + + // If any updates were made, increment the version + if best_result_updated || !conflicting_groups_to_remove.is_empty() { + self.best_results.increment_version(); + } + + (best_result_updated, old_profit) + } + + /// Runs the results collection process. + /// + /// This method continuously receives group results, updates the best results, + /// and triggers builds when necessary. It runs until the receiver is closed. + pub async fn run(&mut self, cancel_token: CancellationToken) { + loop { + if cancel_token.is_cancelled() { + // Cancellation requested, exit the loop + break; + } + + match self.group_result_receiver.try_recv() { + Ok((group_id, (sequence_of_orders, order_group))) => { + self.process_update(group_id, sequence_of_orders, order_group); + } + Err(std_mpsc::TryRecvError::Empty) => { + // No message available, yield to other tasks + tokio::task::yield_now().await; + } + Err(std_mpsc::TryRecvError::Disconnected) => { + // Channel has been closed, exit the loop + break; + } + } + } + } + + /// Processes an update to the best results for a given group. + /// + /// # Arguments + /// + /// * `group_id` - The ID of the group to update. + /// * `sequence_of_orders` - The new ordering information for the group. + /// * `order_group` - The new order group. + fn process_update( + &self, + group_id: GroupId, + sequence_of_orders: ResolutionResult, + order_group: ConflictGroup, + ) { + let start = Instant::now(); + trace!("Received group result for group: {:?}", group_id); + let (best_result_updated, old_profit) = + self.update_best_result(group_id, sequence_of_orders.clone(), order_group); + let duration = start.elapsed(); + + if best_result_updated { + self.log_updated_result(group_id, &sequence_of_orders, old_profit, duration); + } else { + trace!( + "Best result not updated for group: {:?}, duration: {:?}", + group_id, + duration + ); + } + } + + /// Helper function to log the updated best result for a given group. + fn log_updated_result( + &self, + group_id: GroupId, + sequence_of_orders: &ResolutionResult, + old_profit: U256, + duration: Duration, + ) { + trace!( + group_id = group_id, + old_profit = format_ether(old_profit), + new_profit = format_ether(sequence_of_orders.total_profit), + sum_of_best_results = format_ether(self.get_and_display_sum_of_best_results()), + number_of_orders = self.best_results.get_number_of_orders(), + version = self.best_results.get_version(), + duration_ns = duration.as_nanos(), + "Updated best result for group" + ); + } + + /// Returns the sum of the total profits of all the best results. + /// Helper function that is used for traceging. + fn get_and_display_sum_of_best_results(&self) -> U256 { + self.best_results + .data + .iter() + .map(|entry| entry.value().0.total_profit) + .sum::() + } +} + +#[cfg(test)] +mod tests { + use alloy_primitives::U256; + + use super::*; + use std::sync::Arc; + + // Helper function to create a simple ConflictGroup + fn create_order_group(id: usize, conflicting_ids: Vec) -> ConflictGroup { + ConflictGroup { + id, + orders: Arc::new(vec![]), + conflicting_group_ids: Arc::new(conflicting_ids.into_iter().collect()), + } + } + + // Helper function to create a simple ResolutionResult + fn create_sequence_of_orders(profit: u64) -> ResolutionResult { + ResolutionResult { + total_profit: U256::from(profit), + sequence_of_orders: vec![], + } + } + + fn create_results_aggregator(best_results: Option>) -> Arc { + let best_results = best_results.unwrap_or_else(|| Arc::new(BestResults::new())); + Arc::new(ResultsAggregator::new( + std_mpsc::channel().1, + best_results, + // mpsc::channel(1).0, + )) + } + + #[test] + fn test_update_best_result_new_entry() { + let results_aggregator = create_results_aggregator(None); + + let group_id = 1; + let sequence_of_orders = create_sequence_of_orders(100); + let order_group = create_order_group(group_id, vec![]); + + let (best_result_updated, old_profit) = results_aggregator.update_best_result( + group_id, + sequence_of_orders.clone(), + order_group.clone(), + ); + assert!(best_result_updated); + assert_eq!(old_profit, U256::ZERO); + + let best_result = results_aggregator.best_results.data.get(&group_id).unwrap(); + assert_eq!(best_result.value().0.total_profit, U256::from(100)); + assert_eq!(best_result.value().1.id, group_id); + } + + #[test] + fn test_update_best_result_better_profit() { + let results_aggregator = create_results_aggregator(None); + + let group_id = 1; + let initial_sequence_of_orders = create_sequence_of_orders(100); + let initial_order_group = create_order_group(group_id, vec![]); + let (best_result_updated, _) = results_aggregator.update_best_result( + group_id, + initial_sequence_of_orders, + initial_order_group, + ); + assert!(best_result_updated); + + let better_sequence_of_orders = create_sequence_of_orders(200); + let better_order_group = create_order_group(group_id, vec![]); + + let (best_result_updated, old_profit) = results_aggregator.update_best_result( + group_id, + better_sequence_of_orders.clone(), + better_order_group.clone(), + ); + assert!(best_result_updated); + assert_eq!(old_profit, U256::from(100)); + + let best_result = results_aggregator.best_results.data.get(&group_id).unwrap(); + assert_eq!(best_result.value().0.total_profit, U256::from(200)); + } + + #[test] + fn test_update_best_result_worse_profit() { + let results_aggregator = create_results_aggregator(None); + + let group_id = 1; + let initial_sequence_of_orders = create_sequence_of_orders(200); + let initial_order_group = create_order_group(group_id, vec![]); + results_aggregator + .best_results + .data + .insert(group_id, (initial_sequence_of_orders, initial_order_group)); + + let worse_sequence_of_orders = create_sequence_of_orders(100); + let worse_order_group = create_order_group(group_id, vec![]); + + let (best_result_updated, _) = results_aggregator.update_best_result( + group_id, + worse_sequence_of_orders.clone(), + worse_order_group.clone(), + ); + assert!(!best_result_updated); + + let best_result = results_aggregator.best_results.data.get(&group_id).unwrap(); + assert_eq!(best_result.value().0.total_profit, U256::from(200)); + } + + #[test] + fn test_update_best_result_remove_conflicting() { + let results_aggregator = create_results_aggregator(None); + + let group_id_1 = 1; + let group_id_2 = 2; + + let sequence_of_orders_1 = create_sequence_of_orders(100); + let order_group_1 = create_order_group(group_id_1, vec![]); + + // Group 2 is WORSE than group 1 + let sequence_of_orders_2 = create_sequence_of_orders(150); + let order_group_2 = create_order_group(group_id_2, vec![group_id_1]); + + let (best_result_updated, _) = results_aggregator.update_best_result( + group_id_1, + sequence_of_orders_1.clone(), + order_group_1.clone(), + ); + assert!(best_result_updated); + + let (best_result_updated, _) = results_aggregator.update_best_result( + group_id_2, + sequence_of_orders_2.clone(), + order_group_2.clone(), + ); + assert!(best_result_updated); + + // Group 1 is removed because group 2 is better + assert!(!results_aggregator + .best_results + .data + .contains_key(&group_id_1)); + + // Group 2 is kept because it is better + assert!(results_aggregator + .best_results + .data + .contains_key(&group_id_2)); + + // Group 2 is the best result + let best_result = results_aggregator + .best_results + .data + .get(&group_id_2) + .unwrap(); + assert_eq!(best_result.value().0.total_profit, U256::from(150)); + + // Group 1 is not in the best results + let best_result_1 = results_aggregator.best_results.data.get(&group_id_1); + assert!(best_result_1.is_none()); + } + + #[test] + fn test_update_best_result_keep_better_conflicting() { + let results_aggregator = create_results_aggregator(None); + + let group_id_1 = 1; + let group_id_2 = 2; + let sequence_of_orders_1 = create_sequence_of_orders(100); + let order_group_1 = create_order_group(group_id_1, vec![]); + + // Group 2 is worse than group 1 + let sequence_of_orders_2 = create_sequence_of_orders(50); + let order_group_2 = create_order_group(group_id_2, vec![group_id_1]); + + assert!( + results_aggregator + .update_best_result( + group_id_1, + sequence_of_orders_1.clone(), + order_group_1.clone() + ) + .0 + ); + assert!( + !results_aggregator + .update_best_result(group_id_2, sequence_of_orders_2, order_group_2) + .0 + ); + + assert!(results_aggregator + .best_results + .data + .contains_key(&group_id_1)); + assert!(!results_aggregator + .best_results + .data + .contains_key(&group_id_2)); + assert_eq!( + results_aggregator + .best_results + .data + .get(&group_id_1) + .unwrap() + .value() + .0 + .total_profit, + U256::from(100) + ); + assert!(results_aggregator + .best_results + .data + .get(&group_id_2) + .is_none()); + } + + #[test] + fn test_update_best_result_merge_conflicting_groups() { + let results_aggregator = create_results_aggregator(None); + + let group_id_1 = 1; + let group_id_2 = 2; + let group_id_3 = 3; + + // Create two initial groups + let sequence_of_orders_1 = create_sequence_of_orders(100); + let order_group_1 = create_order_group(group_id_1, vec![]); + + let sequence_of_orders_2 = create_sequence_of_orders(150); + let order_group_2 = create_order_group(group_id_2, vec![]); + + // Add the initial groups + assert!( + results_aggregator + .update_best_result(group_id_1, sequence_of_orders_1, order_group_1) + .0 + ); + assert!( + results_aggregator + .update_best_result(group_id_2, sequence_of_orders_2, order_group_2) + .0 + ); + + // Create a third group that conflicts with both group 1 and 2, but is more profitable + let sequence_of_orders_3 = create_sequence_of_orders(300); + let order_group_3 = create_order_group(group_id_3, vec![group_id_1, group_id_2]); + + // Update with the new, more profitable group + assert!( + results_aggregator + .update_best_result(group_id_3, sequence_of_orders_3, order_group_3) + .0 + ); + + // Check that group 3 is now the only group in best_results + assert_eq!(results_aggregator.best_results.data.len(), 1); + assert!(results_aggregator + .best_results + .data + .contains_key(&group_id_3)); + assert!(!results_aggregator + .best_results + .data + .contains_key(&group_id_1)); + assert!(!results_aggregator + .best_results + .data + .contains_key(&group_id_2)); + + // Verify the profit of the remaining group + let best_result = results_aggregator + .best_results + .data + .get(&group_id_3) + .unwrap(); + assert_eq!(best_result.value().0.total_profit, U256::from(300)); + } + + #[test] + fn test_update_best_result_version_increment() { + let best_results = Arc::new(BestResults::new()); + let results_aggregator = create_results_aggregator(Some(Arc::clone(&best_results))); + + let initial_version = best_results.get_version(); + + // Update with a new group + let group_id = 1; + let sequence_of_orders = create_sequence_of_orders(100); + let order_group = create_order_group(group_id, vec![]); + let (updated, _) = + results_aggregator.update_best_result(group_id, sequence_of_orders, order_group); + assert!(updated); + + // Version should have incremented + let new_version = best_results.get_version(); + assert_eq!(new_version, initial_version + 1); + } +} diff --git a/crates/rbuilder/src/building/builders/parallel_builder/simulation_cache.rs b/crates/rbuilder/src/building/builders/parallel_builder/simulation_cache.rs new file mode 100644 index 00000000..05a1b2fe --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/simulation_cache.rs @@ -0,0 +1,347 @@ +use crate::primitives::OrderId; +use ahash::HashMap; +use alloy_primitives::U256; +use parking_lot::RwLock as PLRwLock; +use reth_payload_builder::database::CachedReads; +use revm::db::BundleState; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// An instance of a simulation result that has been cached. +#[derive(Debug, Clone)] +pub struct CachedSimulationState { + pub cached_reads: CachedReads, + pub bundle_state: BundleState, + pub total_profit: U256, + pub per_order_profits: Vec<(OrderId, U256)>, +} + +/// An inner cache of simulation results, keyed by the ordering of the orders that produced the simulation state. +#[derive(Debug, Default)] +struct SimulationCache { + inner_cache: HashMap, Arc>, +} + +impl SimulationCache { + /// Creates a new `SimulationCache`. + pub fn new() -> Self { + SimulationCache { + inner_cache: HashMap::default(), + } + } +} + +/// A shared instance of the simulation cache that can be used by many threads. +/// It provides thread-safe access and maintains statistics about cache usage. +#[derive(Debug, Clone)] +pub struct SharedSimulationCache { + cache: Arc>, + cache_requests: Arc, + full_hits: Arc, + partial_hits: Arc, + number_of_order_simulations_saved: Arc, + number_of_order_simulations_requested: Arc, +} + +impl SharedSimulationCache { + /// Creates a new `SharedSimulationCache`. + pub fn new() -> Self { + SharedSimulationCache { + cache: Arc::new(PLRwLock::new(SimulationCache::new())), + cache_requests: Arc::new(AtomicUsize::new(0)), + full_hits: Arc::new(AtomicUsize::new(0)), + partial_hits: Arc::new(AtomicUsize::new(0)), + number_of_order_simulations_saved: Arc::new(AtomicUsize::new(0)), + number_of_order_simulations_requested: Arc::new(AtomicUsize::new(0)), + } + } + + /// Retrieves the cached simulation state for the longest matching prefix of the given ordering. + /// + /// # Arguments + /// + /// * `ordering` - A reference to a vector of `OrderId`s representing the sequence of orders. + /// + /// # Returns + /// + /// A tuple containing: + /// - An `Option` with the `Arc` if a matching prefix is found. + /// - The index up to which the ordering was cached. + pub fn get_cached_state( + &self, + ordering: &[OrderId], + ) -> (Option>, usize) { + let total_requested = ordering.len(); + self.number_of_order_simulations_requested + .fetch_add(total_requested, Ordering::Relaxed); + self.cache_requests.fetch_add(1, Ordering::Relaxed); + + let cache_lock = self.cache.read(); + let mut current_state: Option> = None; + let mut last_cached_index = 0; + + let mut partial_key = ordering.to_owned(); + + while !partial_key.is_empty() { + if let Some(cached_result) = cache_lock.inner_cache.get(&partial_key) { + current_state = Some(cached_result.clone()); + last_cached_index = partial_key.len(); + + // Update statistics + if last_cached_index == ordering.len() { + self.full_hits.fetch_add(1, Ordering::Relaxed); + } else { + self.partial_hits.fetch_add(1, Ordering::Relaxed); + } + self.number_of_order_simulations_saved + .fetch_add(last_cached_index, Ordering::Relaxed); + + break; + } + + // Remove the last `OrderId` and try again with a shorter partial key + partial_key.pop(); + } + + (current_state, last_cached_index) + } + + /// Stores the simulation result for a given ordering in the shared cache. + /// + /// # Arguments + /// + /// * `ordering` - A reference to a vector of `OrderId`s representing the sequence of orders. + /// * `cached_state` - The `CachedSimulationState` to store. + pub fn store_cached_state(&self, ordering: &[OrderId], cached_state: CachedSimulationState) { + let partial_key = ordering.to_owned(); + let new_state = Arc::new(cached_state); + + let mut cache_lock = self.cache.write(); + cache_lock.inner_cache.insert(partial_key, new_state); + } + + /// Retrieves statistics about the cache usage. + /// + /// # Returns + /// + /// A tuple containing: + /// - Number of full hits (complete ordering found in cache). + /// - Number of partial hits (partial prefix found in cache). + /// - Total number of order simulations saved. + /// - Total number of order simulations requested. + /// - Total number of cache requests. + /// - Rate of full hits (%). + /// - Rate of partial hits (%). + /// - Cache efficiency (%). + #[allow(dead_code)] + pub fn stats(&self) -> (usize, usize, usize, usize, usize, f64, f64, f64) { + let full_hits = self.full_hits.load(Ordering::Relaxed); + let cache_requests = self.cache_requests.load(Ordering::Relaxed); + let partial_hits = self.partial_hits.load(Ordering::Relaxed); + let total_simulations_saved = self + .number_of_order_simulations_saved + .load(Ordering::Relaxed); + let total_simulations_requested = self + .number_of_order_simulations_requested + .load(Ordering::Relaxed); + + let rate_of_full_hits = if total_simulations_requested > 0 { + (full_hits as f64 / total_simulations_requested as f64) * 100.0 + } else { + 0.0 + }; + let rate_of_partial_hits = if total_simulations_requested > 0 { + (partial_hits as f64 / total_simulations_requested as f64) * 100.0 + } else { + 0.0 + }; + let rate_of_cache_efficiency = if total_simulations_requested > 0 { + (total_simulations_saved as f64 / total_simulations_requested as f64) * 100.0 + } else { + 0.0 + }; + + ( + full_hits, + partial_hits, + total_simulations_saved, + total_simulations_requested, + cache_requests, + rate_of_full_hits, + rate_of_partial_hits, + rate_of_cache_efficiency, + ) + } +} + +impl Default for SharedSimulationCache { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{Address, B256, U256}; + use reth_primitives::revm_primitives::AccountInfo; + use revm::db::{states::StorageSlot, AccountStatus, BundleAccount}; + use std::collections::HashMap; + + struct TestDataGenerator { + last_used_id: u64, + } + + impl TestDataGenerator { + fn new() -> Self { + TestDataGenerator { last_used_id: 0 } + } + + fn create_order_id(&mut self) -> OrderId { + self.last_used_id += 1; + let mut bytes = [0u8; 32]; + bytes[31] = self.last_used_id as u8; + OrderId::Tx(B256::from(bytes)) + } + + fn create_cached_simulation_state(&mut self) -> CachedSimulationState { + let mut cached_reads = CachedReads::default(); + let mut storage = HashMap::new(); + storage.insert(U256::from(self.last_used_id), U256::from(self.last_used_id)); + cached_reads.insert_account(Address::random(), AccountInfo::default(), storage); + + let mut storage_bundle_account: HashMap = HashMap::new(); + let storage_slot = StorageSlot::new_changed( + U256::from(self.last_used_id), + U256::from(self.last_used_id + 1), + ); + storage_bundle_account.insert(U256::from(self.last_used_id), storage_slot); + + let mut bundle_state = BundleState::default(); + let account = BundleAccount::new( + Some(AccountInfo::default()), + Some(AccountInfo::default()), + storage_bundle_account, + AccountStatus::Changed, + ); + bundle_state.state.insert(Address::random(), account); + + CachedSimulationState { + cached_reads, + bundle_state, + total_profit: U256::from(self.last_used_id), + per_order_profits: vec![(self.create_order_id(), U256::from(10))], + } + } + } + + /// This test verifies that we can store a cached simulation state for a specific ordering + /// and then retrieve it correctly. It checks if the retrieved state matches the stored state, + /// including BundleState and CachedReads, and if the cached index is correct. + #[test] + fn test_store_and_retrieve_cached_state() { + let cache = SharedSimulationCache::new(); + let mut gen = TestDataGenerator::new(); + + let ordering = vec![ + gen.create_order_id(), + gen.create_order_id(), + gen.create_order_id(), + ]; + let state = gen.create_cached_simulation_state(); + + cache.store_cached_state(&ordering, state.clone()); + + let (retrieved_state, cached_index) = cache.get_cached_state(&ordering); + assert_eq!(cached_index, 3); + assert!(retrieved_state.is_some()); + let retrieved_state = retrieved_state.unwrap(); + assert_eq!(retrieved_state.total_profit, state.total_profit); + assert_eq!(retrieved_state.bundle_state, state.bundle_state); + } + + // test that we get a full hit when we request the same ordering again but after storing more + // states + #[test] + fn test_full_hit_after_storing_more_states() { + let cache = SharedSimulationCache::new(); + let mut gen = TestDataGenerator::new(); + + let ordering = vec![ + gen.create_order_id(), + gen.create_order_id(), + gen.create_order_id(), + ]; + let state = gen.create_cached_simulation_state(); + + cache.store_cached_state(&ordering, state.clone()); + + let ordering2 = vec![ + gen.create_order_id(), + gen.create_order_id(), + gen.create_order_id(), + gen.create_order_id(), + ]; + let state2 = gen.create_cached_simulation_state(); + + cache.store_cached_state(&ordering2, state2.clone()); + + let (retrieved_state, cached_index) = cache.get_cached_state(&ordering); + assert_eq!(cached_index, 3); + assert!(retrieved_state.is_some()); + let retrieved_state = retrieved_state.unwrap(); + assert_eq!(retrieved_state.total_profit, state.total_profit); + } + + /// This test checks the partial hit functionality of the cache. It stores a state for a + /// partial ordering, then attempts to retrieve a state for a longer ordering that includes + /// the cached partial ordering. It verifies that we get a partial hit, the correct + /// cached index, and that the BundleState matches the stored partial state. + #[test] + fn test_partial_hit() { + let cache = SharedSimulationCache::new(); + let mut gen = TestDataGenerator::new(); + + let ordering = vec![gen.create_order_id(), gen.create_order_id()]; + let state = gen.create_cached_simulation_state(); + + cache.store_cached_state(&ordering, state.clone()); + + let extended_ordering = vec![ordering[0], ordering[1], gen.create_order_id()]; + let (retrieved_state, cached_index) = cache.get_cached_state(&extended_ordering); + assert_eq!(cached_index, 2); + assert!(retrieved_state.is_some()); + let retrieved_state = retrieved_state.unwrap(); + assert_eq!(retrieved_state.bundle_state, state.bundle_state); + } + + /// This test ensures that the cache correctly handles a miss scenario. It attempts to + /// retrieve a state for an ordering that hasn't been cached and verifies that we get + /// a cache miss (no retrieved state and a cached index of 0). + #[test] + fn test_cache_miss() { + let cache = SharedSimulationCache::new(); + let mut gen = TestDataGenerator::new(); + + let ordering = vec![ + gen.create_order_id(), + gen.create_order_id(), + gen.create_order_id(), + ]; + + let (retrieved_state, cached_index) = cache.get_cached_state(&ordering); + assert_eq!(cached_index, 0); + assert!(retrieved_state.is_none()); + + // store a state for a different ordering + let ordering2 = vec![gen.create_order_id(), gen.create_order_id()]; + cache.store_cached_state(&ordering2, gen.create_cached_simulation_state()); + + // try to retrieve the original ordering + let (retrieved_state, cached_index) = cache.get_cached_state(&ordering); + + // we still get a cache miss + assert_eq!(cached_index, 0); + assert!(retrieved_state.is_none()); + } +} diff --git a/crates/rbuilder/src/building/builders/parallel_builder/task.rs b/crates/rbuilder/src/building/builders/parallel_builder/task.rs new file mode 100644 index 00000000..b1a82867 --- /dev/null +++ b/crates/rbuilder/src/building/builders/parallel_builder/task.rs @@ -0,0 +1,100 @@ +use std::{cmp::Ordering, time::Instant}; + +use super::ConflictGroup; + +/// ConflictTask provides a task for resolving a [ConflictGroup] with a specific [Algorithm]. +#[derive(Debug, Clone)] +pub struct ConflictTask { + pub group_idx: usize, + pub algorithm: Algorithm, + pub priority: TaskPriority, + pub group: ConflictGroup, + pub created_at: Instant, +} + +/// TaskPriority provides a priority for a [ConflictTask]. +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] +pub enum TaskPriority { + Low = 0, + Medium = 1, + High = 2, +} + +impl TaskPriority { + pub fn display(&self) -> &str { + match self { + TaskPriority::Low => "Low", + TaskPriority::Medium => "Medium", + TaskPriority::High => "High", + } + } +} + +/// [PartialEq] [Eq] [PartialOrd] [Ord] are the traits that are required for a [ConflictTask] to be used in a priority queue. +impl PartialEq for ConflictTask { + fn eq(&self, other: &Self) -> bool { + self.priority == other.priority + } +} + +impl Eq for ConflictTask {} + +impl PartialOrd for ConflictTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ConflictTask { + fn cmp(&self, other: &Self) -> Ordering { + // Higher priority first, then earlier created_at + other + .priority + .cmp(&self.priority) + .then_with(|| self.created_at.cmp(&other.created_at)) + } +} + +/// Algorithm provides an algorithm for resolving a [ConflictGroup]. +/// Initially these are all algorithms that produce a sequence of orders to execute. +#[derive(Debug, Clone, Copy)] +pub enum Algorithm { + /// `Greedy` checks the following ordrerings: max profit, mev gas price + Greedy, + /// `ReverseGreedy` checks the reverse greedy orderings: e.g. min profit, min mev gas price first + ReverseGreedy, + /// `Length` checks the length based orderings + Length, + /// `AllPermutations` checks all possible permutations of the group. + AllPermutations, + /// `Random` checks random permutations of the group. + Random { seed: u64, count: usize }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_task_priority_ordering() { + assert!(TaskPriority::Low < TaskPriority::Medium); + assert!(TaskPriority::Medium < TaskPriority::High); + assert!(TaskPriority::Low < TaskPriority::High); + } + + #[test] + fn test_task_priority_display() { + assert_eq!(TaskPriority::Low.display(), "Low"); + assert_eq!(TaskPriority::Medium.display(), "Medium"); + assert_eq!(TaskPriority::High.display(), "High"); + } + + #[test] + fn test_task_priority_equality() { + assert_eq!(TaskPriority::Low, TaskPriority::Low); + assert_ne!(TaskPriority::Low, TaskPriority::Medium); + assert_ne!(TaskPriority::Low, TaskPriority::High); + } + + // to-do: test equal priority ordering by created_at +} diff --git a/crates/rbuilder/src/live_builder/config.rs b/crates/rbuilder/src/live_builder/config.rs index 6713d4e1..64516771 100644 --- a/crates/rbuilder/src/live_builder/config.rs +++ b/crates/rbuilder/src/live_builder/config.rs @@ -18,10 +18,10 @@ use crate::{ beacon_api_client::Client, building::{ builders::{ - merging_builder::{ - merging_build_backtest, MergingBuilderConfig, MergingBuildingAlgorithm, - }, ordering_builder::{OrderingBuilderConfig, OrderingBuildingAlgorithm}, + parallel_builder::{ + parallel_build_backtest, ParallelBuilderConfig, ParallelBuildingAlgorithm, + }, BacktestSimulateBlockInput, Block, BlockBuildingAlgorithm, }, Sorting, @@ -72,8 +72,8 @@ pub const DEFAULT_MAX_CONCURRENT_SEALS: u64 = 1; #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] #[serde(tag = "algo", rename_all = "kebab-case", deny_unknown_fields)] pub enum SpecificBuilderConfig { + ParallelBuilder(ParallelBuilderConfig), OrderingBuilder(OrderingBuilderConfig), - MergingBuilder(MergingBuilderConfig), } #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] @@ -362,8 +362,8 @@ impl LiveBuilderConfig for Config { SpecificBuilderConfig::OrderingBuilder(config) => { crate::building::builders::ordering_builder::backtest_simulate_block(config, input) } - SpecificBuilderConfig::MergingBuilder(config) => { - merging_build_backtest::(input, config) + SpecificBuilderConfig::ParallelBuilder(config) => { + parallel_build_backtest(input, config) } } } @@ -512,12 +512,12 @@ where cfg.name, )) } - SpecificBuilderConfig::MergingBuilder(merge_cfg) => { - Arc::new(MergingBuildingAlgorithm::new( + SpecificBuilderConfig::ParallelBuilder(parallel_cfg) => { + Arc::new(ParallelBuildingAlgorithm::new( root_hash_config.clone(), root_hash_task_pool.clone(), sbundle_mergeabe_signers.to_vec(), - merge_cfg, + parallel_cfg, cfg.name, )) } diff --git a/crates/rbuilder/src/live_builder/order_input/rpc_server.rs b/crates/rbuilder/src/live_builder/order_input/rpc_server.rs index 367b7d04..770dfdbf 100644 --- a/crates/rbuilder/src/live_builder/order_input/rpc_server.rs +++ b/crates/rbuilder/src/live_builder/order_input/rpc_server.rs @@ -155,7 +155,7 @@ async fn handle_mev_send_bundle( }; match decode_res { RawShareBundleDecodeResult::NewShareBundle(bundle) => { - let order = Order::ShareBundle(bundle); + let order = Order::ShareBundle(*bundle); let parse_duration = start.elapsed(); let target_block = order.target_block().unwrap_or_default(); trace!(order = ?order.id(), parse_duration_mus = parse_duration.as_micros(), target_block, "Received share bundle"); diff --git a/crates/rbuilder/src/primitives/serialize.rs b/crates/rbuilder/src/primitives/serialize.rs index 0eb86fae..2c6bff75 100644 --- a/crates/rbuilder/src/primitives/serialize.rs +++ b/crates/rbuilder/src/primitives/serialize.rs @@ -264,7 +264,7 @@ pub struct CancelShareBundle { /// Since we use the same API (mev_sendBundle) to get new bundles and also to cancel them we need this struct #[allow(clippy::large_enum_variant)] pub enum RawShareBundleDecodeResult { - NewShareBundle(ShareBundle), + NewShareBundle(Box), CancelShareBundle(CancelShareBundle), } @@ -276,7 +276,7 @@ impl RawShareBundle { ) -> Result { let decode_res = self.decode(encoding)?; match decode_res { - RawShareBundleDecodeResult::NewShareBundle(b) => Ok(b), + RawShareBundleDecodeResult::NewShareBundle(b) => Ok(*b), RawShareBundleDecodeResult::CancelShareBundle(_) => { Err(RawShareBundleConvertError::FoundCancelExpectingBundle) } @@ -334,7 +334,7 @@ impl RawShareBundle { bundle.hash_slow(); - Ok(RawShareBundleDecodeResult::NewShareBundle(bundle)) + Ok(RawShareBundleDecodeResult::NewShareBundle(Box::new(bundle))) } /// See [TransactionSignedEcRecoveredWithBlobs::envelope_encoded_no_blobs]