diff --git a/crates/rbuilder/src/bin/dummy-builder.rs b/crates/rbuilder/src/bin/dummy-builder.rs index bd81c339..a5d7644c 100644 --- a/crates/rbuilder/src/bin/dummy-builder.rs +++ b/crates/rbuilder/src/bin/dummy-builder.rs @@ -41,7 +41,10 @@ use reth::tasks::pool::BlockingTaskPool; use reth_chainspec::MAINNET; use reth_db::{database::Database, DatabaseEnv}; use reth_provider::{DatabaseProviderFactory, StateProviderFactory}; -use tokio::{signal::ctrl_c, sync::broadcast}; +use tokio::{ + signal::ctrl_c, + sync::{broadcast, mpsc}, +}; use tokio_util::sync::CancellationToken; use tracing::{info, level_filters::LevelFilter}; @@ -71,6 +74,18 @@ async fn main() -> eyre::Result<()> { cancel.clone(), ); + let order_input_config = OrderInputConfig::new( + false, + true, + DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(), + DEFAULT_INCOMING_BUNDLES_PORT, + *DEFAULT_IP, + DEFAULT_SERVE_MAX_CONNECTIONS, + DEFAULT_RESULTS_CHANNEL_TIMEOUT, + DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, + ); + let (orderpool_sender, orderpool_receiver) = + mpsc::channel(order_input_config.input_channel_buffer_size); let builder = LiveBuilder::< ProviderFactoryReopener>, Arc, @@ -80,16 +95,7 @@ async fn main() -> eyre::Result<()> { error_storage_path: None, simulation_threads: 1, blocks_source: payload_event, - order_input_config: OrderInputConfig::new( - false, - true, - DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(), - DEFAULT_INCOMING_BUNDLES_PORT, - *DEFAULT_IP, - DEFAULT_SERVE_MAX_CONNECTIONS, - DEFAULT_RESULTS_CHANNEL_TIMEOUT, - DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, - ), + order_input_config, chain_chain_spec: chain_spec.clone(), provider: create_provider_factory( Some(&RETH_DB_PATH.parse::().unwrap()), @@ -105,6 +111,8 @@ async fn main() -> eyre::Result<()> { sink_factory: Box::new(TraceBlockSinkFactory {}), builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))], run_sparse_trie_prefetcher: false, + orderpool_sender, + orderpool_receiver, }; let ctrlc = tokio::spawn(async move { diff --git a/crates/rbuilder/src/live_builder/base_config.rs b/crates/rbuilder/src/live_builder/base_config.rs index 9bb80f95..f6cb7571 100644 --- a/crates/rbuilder/src/live_builder/base_config.rs +++ b/crates/rbuilder/src/live_builder/base_config.rs @@ -32,6 +32,7 @@ use std::{ sync::Arc, time::Duration, }; +use tokio::sync::mpsc; use tracing::warn; use super::SlotSource; @@ -205,11 +206,14 @@ impl BaseConfig { P: DatabaseProviderFactory + StateProviderFactory + HeaderProvider + Clone, SlotSourceType: SlotSource, { + let order_input_config = OrderInputConfig::from_config(self)?; + let (orderpool_sender, orderpool_receiver) = + mpsc::channel(order_input_config.input_channel_buffer_size); Ok(LiveBuilder:: { watchdog_timeout: self.watchdog_timeout(), error_storage_path: self.error_storage_path.clone(), simulation_threads: self.simulation_threads, - order_input_config: OrderInputConfig::from_config(self)?, + order_input_config, blocks_source: slot_source, chain_chain_spec: self.chain_spec()?, provider, @@ -225,6 +229,9 @@ impl BaseConfig { builders: Vec::new(), run_sparse_trie_prefetcher: self.root_hash_use_sparse_trie, + + orderpool_sender, + orderpool_receiver, }) } diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index cf9bff78..c419f0e7 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -26,6 +26,7 @@ use alloy_primitives::{Address, B256}; use building::BlockBuildingPool; use eyre::Context; use jsonrpsee::RpcModule; +use order_input::ReplaceableOrderPoolCommand; use payload_events::MevBoostSlotData; use reth::{primitives::Header, providers::HeaderProvider}; use reth_chainspec::ChainSpec; @@ -81,6 +82,10 @@ where pub sink_factory: Box, pub builders: Vec>>, pub extra_rpc: RpcModule<()>, + + /// Notify rbuilder of new [`ReplaceableOrderPoolCommand`] flow via this channel. + pub orderpool_sender: mpsc::Sender, + pub orderpool_receiver: mpsc::Receiver, } impl LiveBuilder @@ -119,6 +124,8 @@ where self.provider.clone(), self.extra_rpc, self.global_cancellation.clone(), + self.orderpool_sender, + self.orderpool_receiver, ) .await?; inner_jobs_handles.push(handle); diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index 3691e577..d3ed5207 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -95,7 +95,7 @@ pub struct OrderInputConfig { /// Timeout to wait when sending to that channel (after that the ReplaceableOrderPoolCommand is lost). results_channel_timeout: Duration, /// Size of the bounded channel. - input_channel_buffer_size: usize, + pub input_channel_buffer_size: usize, } pub const DEFAULT_SERVE_MAX_CONNECTIONS: u32 = 4096; pub const DEFAULT_RESULTS_CHANNEL_TIMEOUT: Duration = Duration::from_millis(50); @@ -184,6 +184,8 @@ pub async fn start_orderpool_jobs

( provider_factory: P, extra_rpc: RpcModule<()>, global_cancel: CancellationToken, + order_sender: mpsc::Sender, + order_receiver: mpsc::Receiver, ) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)> where P: StateProviderFactory + 'static, @@ -200,8 +202,6 @@ where orderpool: orderpool.clone(), }; - let (order_sender, order_receiver) = mpsc::channel(config.input_channel_buffer_size); - let clean_job = clean_orderpool::spawn_clean_orderpool_job( config.clone(), provider_factory,