Skip to content

Commit

Permalink
expose orderpool_sender on public api
Browse files Browse the repository at this point in the history
  • Loading branch information
liamaharon committed Nov 7, 2024
1 parent 5d152e8 commit be96f5d
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 15 deletions.
30 changes: 19 additions & 11 deletions crates/rbuilder/src/bin/dummy-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use reth::tasks::pool::BlockingTaskPool;
use reth_chainspec::MAINNET;
use reth_db::{database::Database, DatabaseEnv};
use reth_provider::{DatabaseProviderFactory, StateProviderFactory};
use tokio::{signal::ctrl_c, sync::broadcast};
use tokio::{
signal::ctrl_c,
sync::{broadcast, mpsc},
};
use tokio_util::sync::CancellationToken;
use tracing::{info, level_filters::LevelFilter};

Expand Down Expand Up @@ -71,6 +74,18 @@ async fn main() -> eyre::Result<()> {
cancel.clone(),
);

let order_input_config = OrderInputConfig::new(
false,
true,
DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(),
DEFAULT_INCOMING_BUNDLES_PORT,
*DEFAULT_IP,
DEFAULT_SERVE_MAX_CONNECTIONS,
DEFAULT_RESULTS_CHANNEL_TIMEOUT,
DEFAULT_INPUT_CHANNEL_BUFFER_SIZE,
);
let (orderpool_sender, orderpool_receiver) =
mpsc::channel(order_input_config.input_channel_buffer_size);
let builder = LiveBuilder::<
ProviderFactoryReopener<Arc<DatabaseEnv>>,
Arc<DatabaseEnv>,
Expand All @@ -80,16 +95,7 @@ async fn main() -> eyre::Result<()> {
error_storage_path: None,
simulation_threads: 1,
blocks_source: payload_event,
order_input_config: OrderInputConfig::new(
false,
true,
DEFAULT_EL_NODE_IPC_PATH.parse().unwrap(),
DEFAULT_INCOMING_BUNDLES_PORT,
*DEFAULT_IP,
DEFAULT_SERVE_MAX_CONNECTIONS,
DEFAULT_RESULTS_CHANNEL_TIMEOUT,
DEFAULT_INPUT_CHANNEL_BUFFER_SIZE,
),
order_input_config,
chain_chain_spec: chain_spec.clone(),
provider: create_provider_factory(
Some(&RETH_DB_PATH.parse::<PathBuf>().unwrap()),
Expand All @@ -105,6 +111,8 @@ async fn main() -> eyre::Result<()> {
sink_factory: Box::new(TraceBlockSinkFactory {}),
builders: vec![Arc::new(DummyBuildingAlgorithm::new(10))],
run_sparse_trie_prefetcher: false,
orderpool_sender,
orderpool_receiver,
};

let ctrlc = tokio::spawn(async move {
Expand Down
9 changes: 8 additions & 1 deletion crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::sync::mpsc;
use tracing::warn;

use super::SlotSource;
Expand Down Expand Up @@ -205,11 +206,14 @@ impl BaseConfig {
P: DatabaseProviderFactory<DB> + StateProviderFactory + HeaderProvider + Clone,
SlotSourceType: SlotSource,
{
let order_input_config = OrderInputConfig::from_config(self)?;
let (orderpool_sender, orderpool_receiver) =
mpsc::channel(order_input_config.input_channel_buffer_size);
Ok(LiveBuilder::<P, DB, SlotSourceType> {
watchdog_timeout: self.watchdog_timeout(),
error_storage_path: self.error_storage_path.clone(),
simulation_threads: self.simulation_threads,
order_input_config: OrderInputConfig::from_config(self)?,
order_input_config,
blocks_source: slot_source,
chain_chain_spec: self.chain_spec()?,
provider,
Expand All @@ -225,6 +229,9 @@ impl BaseConfig {
builders: Vec::new(),

run_sparse_trie_prefetcher: self.root_hash_use_sparse_trie,

orderpool_sender,
orderpool_receiver,
})
}

Expand Down
7 changes: 7 additions & 0 deletions crates/rbuilder/src/live_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use alloy_primitives::{Address, B256};
use building::BlockBuildingPool;
use eyre::Context;
use jsonrpsee::RpcModule;
use order_input::ReplaceableOrderPoolCommand;
use payload_events::MevBoostSlotData;
use reth::{primitives::Header, providers::HeaderProvider};
use reth_chainspec::ChainSpec;
Expand Down Expand Up @@ -81,6 +82,10 @@ where
pub sink_factory: Box<dyn UnfinishedBlockBuildingSinkFactory>,
pub builders: Vec<Arc<dyn BlockBuildingAlgorithm<P, DB>>>,
pub extra_rpc: RpcModule<()>,

/// Notify rbuilder of new [`ReplaceableOrderPoolCommand`] flow via this channel.
pub orderpool_sender: mpsc::Sender<ReplaceableOrderPoolCommand>,
pub orderpool_receiver: mpsc::Receiver<ReplaceableOrderPoolCommand>,
}

impl<P, DB, BlocksSourceType: SlotSource> LiveBuilder<P, DB, BlocksSourceType>
Expand Down Expand Up @@ -119,6 +124,8 @@ where
self.provider.clone(),
self.extra_rpc,
self.global_cancellation.clone(),
self.orderpool_sender,
self.orderpool_receiver,
)
.await?;
inner_jobs_handles.push(handle);
Expand Down
6 changes: 3 additions & 3 deletions crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub struct OrderInputConfig {
/// Timeout to wait when sending to that channel (after that the ReplaceableOrderPoolCommand is lost).
results_channel_timeout: Duration,
/// Size of the bounded channel.
input_channel_buffer_size: usize,
pub input_channel_buffer_size: usize,
}
pub const DEFAULT_SERVE_MAX_CONNECTIONS: u32 = 4096;
pub const DEFAULT_RESULTS_CHANNEL_TIMEOUT: Duration = Duration::from_millis(50);
Expand Down Expand Up @@ -184,6 +184,8 @@ pub async fn start_orderpool_jobs<P>(
provider_factory: P,
extra_rpc: RpcModule<()>,
global_cancel: CancellationToken,
order_sender: mpsc::Sender<ReplaceableOrderPoolCommand>,
order_receiver: mpsc::Receiver<ReplaceableOrderPoolCommand>,
) -> eyre::Result<(JoinHandle<()>, OrderPoolSubscriber)>
where
P: StateProviderFactory + 'static,
Expand All @@ -200,8 +202,6 @@ where
orderpool: orderpool.clone(),
};

let (order_sender, order_receiver) = mpsc::channel(config.input_channel_buffer_size);

let clean_job = clean_orderpool::spawn_clean_orderpool_job(
config.clone(),
provider_factory,
Expand Down

0 comments on commit be96f5d

Please sign in to comment.