From f69e1747ad5150033f0bb94443df4550fb898b70 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 19:05:01 +0000 Subject: [PATCH] Notify external solvers --- .../competition/participation_guard/db.rs | 47 +++++++++++++++++-- .../competition/participation_guard/mod.rs | 13 +++-- .../participation_guard/onchain.rs | 6 +-- crates/autopilot/src/infra/solvers/dto/mod.rs | 1 + .../autopilot/src/infra/solvers/dto/notify.rs | 9 ++++ crates/autopilot/src/infra/solvers/mod.rs | 6 ++- crates/autopilot/src/run.rs | 18 ++++--- 7 files changed, 80 insertions(+), 20 deletions(-) create mode 100644 crates/autopilot/src/infra/solvers/dto/notify.rs diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 0a8dba9f7d..6c50b96553 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -2,10 +2,12 @@ use { crate::{ database::Postgres, domain::{eth, Metrics}, - infra::Driver, + infra, }, ethrpc::block_stream::CurrentBlockWatcher, + futures::future::join_all, std::{ + collections::HashMap, sync::Arc, time::{Duration, Instant}, }, @@ -21,6 +23,7 @@ struct Inner { banned_solvers: dashmap::DashMap, ttl: Duration, last_auctions_count: u32, + drivers_by_address: HashMap>, } impl Validator { @@ -30,12 +33,14 @@ impl Validator { settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, ttl: Duration, last_auctions_count: u32, + drivers_by_address: HashMap>, ) -> Self { let self_ = Self(Arc::new(Inner { db, banned_solvers: Default::default(), ttl, last_auctions_count, + drivers_by_address, })); self_.start_maintenance(settlement_updates_receiver, current_block); @@ -50,13 +55,14 @@ impl Validator { mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, current_block: CurrentBlockWatcher, ) { - let self_ = self.0.clone(); + let self_ = self.clone(); tokio::spawn(async move { while settlement_updates_receiver.recv().await.is_some() { let current_block = current_block.borrow().number; match self_ + .0 .db - .find_non_settling_solvers(self_.last_auctions_count, current_block) + .find_non_settling_solvers(self_.0.last_auctions_count, current_block) .await { Ok(non_settling_solvers) => { @@ -74,10 +80,11 @@ impl Validator { .collect::>(); tracing::debug!(?non_settling_solvers, "found non-settling solvers"); + self_.notify_solvers(&non_settling_solvers); let now = Instant::now(); for solver in non_settling_solvers { - self_.banned_solvers.insert(solver, now); + self_.0.banned_solvers.insert(solver, now); } } Err(err) => { @@ -87,11 +94,41 @@ impl Validator { } }); } + + /// Try to notify all the non-settling external solvers. + fn notify_solvers(&self, non_settling_solvers: &[eth::Address]) { + let futures = non_settling_solvers + .iter() + .cloned() + .map(|solver| { + let self_ = self.0.clone(); + async move { + match self_.drivers_by_address.get(&solver) { + Some(driver) => { + if let Err(err) = driver + .notify(&infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions) + .await + { + tracing::debug!(?solver, ?err, "unable to notify external solver"); + } + } + None => { + tracing::error!(?solver, "found unrecognized non-settling driver"); + } + } + } + } + ).collect::>(); + + tokio::spawn(async move { + join_all(futures).await; + }); + } } #[async_trait::async_trait] impl super::Validator for Validator { - async fn is_allowed(&self, driver: &Driver) -> anyhow::Result { + async fn is_allowed(&self, driver: &infra::Driver) -> anyhow::Result { // Check if solver accepted this feature. This should be removed once a CIP is // approved. if !driver.accepts_unsettled_blocking { diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index 7b4bf53157..5564cc07f6 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -5,9 +5,10 @@ use { crate::{ arguments::DbBasedSolverParticipationGuardConfig, database::Postgres, - infra::{Driver, Ethereum}, + domain::eth, + infra, }, - std::sync::Arc, + std::{collections::HashMap, sync::Arc}, }; /// This struct checks whether a solver can participate in the competition by @@ -22,10 +23,11 @@ struct Inner { impl SolverParticipationGuard { pub fn new( - eth: Ethereum, + eth: infra::Ethereum, db: Postgres, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, + drivers_by_address: HashMap>, ) -> Self { let mut validators: Vec> = Vec::new(); @@ -37,6 +39,7 @@ impl SolverParticipationGuard { settlement_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, db_based_validator_config.solver_last_auctions_participation_count, + drivers_by_address, ); validators.push(Box::new(database_solver_participation_validator)); } @@ -52,7 +55,7 @@ impl SolverParticipationGuard { /// the following order: /// 1. DB-based validator: operates fast since it uses in-memory cache. /// 2. Onchain-based validator: only then calls the Authenticator contract. - pub async fn can_participate(&self, driver: &Driver) -> anyhow::Result { + pub async fn can_participate(&self, driver: &infra::Driver) -> anyhow::Result { for validator in &self.0.validators { if !validator.is_allowed(driver).await? { return Ok(false); @@ -65,5 +68,5 @@ impl SolverParticipationGuard { #[async_trait::async_trait] trait Validator: Send + Sync { - async fn is_allowed(&self, driver: &Driver) -> anyhow::Result; + async fn is_allowed(&self, driver: &infra::Driver) -> anyhow::Result; } diff --git a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs index 3e472b298e..d1327f0254 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs @@ -1,14 +1,14 @@ -use crate::infra::{Driver, Ethereum}; +use crate::infra; /// Calls Authenticator contract to check if a solver has a sufficient /// permission. pub(super) struct Validator { - pub eth: Ethereum, + pub eth: infra::Ethereum, } #[async_trait::async_trait] impl super::Validator for Validator { - async fn is_allowed(&self, solver: &Driver) -> anyhow::Result { + async fn is_allowed(&self, solver: &infra::Driver) -> anyhow::Result { Ok(self .eth .contracts() diff --git a/crates/autopilot/src/infra/solvers/dto/mod.rs b/crates/autopilot/src/infra/solvers/dto/mod.rs index d3a156294c..5365700ce8 100644 --- a/crates/autopilot/src/infra/solvers/dto/mod.rs +++ b/crates/autopilot/src/infra/solvers/dto/mod.rs @@ -1,6 +1,7 @@ //! Types for communicating with drivers as defined in //! `crates/driver/openapi.yml`. +pub mod notify; pub mod reveal; pub mod settle; pub mod solve; diff --git a/crates/autopilot/src/infra/solvers/dto/notify.rs b/crates/autopilot/src/infra/solvers/dto/notify.rs new file mode 100644 index 0000000000..08165964fd --- /dev/null +++ b/crates/autopilot/src/infra/solvers/dto/notify.rs @@ -0,0 +1,9 @@ +use {serde::Serialize, serde_with::serde_as}; + +#[serde_as] +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum Request { + /// The driver won multiple consecutive auctions but never settled them. + UnsettledConsecutiveAuctions, +} diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index aae623ebaa..2490a71b15 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -1,6 +1,6 @@ use { self::dto::{reveal, settle, solve}, - crate::{arguments::Account, domain::eth, util}, + crate::{arguments::Account, domain::eth, infra::solvers::dto::notify, util}, anyhow::{anyhow, Context, Result}, reqwest::{Client, StatusCode}, std::time::Duration, @@ -116,6 +116,10 @@ impl Driver { Ok(()) } + pub async fn notify(&self, request: ¬ify::Request) -> Result<()> { + self.request_response("notify", request, None).await + } + async fn request_response( &self, path: &str, diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 2f426a1967..a6e30dd562 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -52,6 +52,7 @@ use { token_list::{AutoUpdatingTokenList, TokenListConfiguration}, }, std::{ + collections::HashMap, sync::{Arc, RwLock}, time::{Duration, Instant}, }, @@ -368,12 +369,6 @@ pub async fn run(args: Arguments) { let (settlement_updates_sender, settlement_updates_receiver) = tokio::sync::mpsc::unbounded_channel(); - let solver_participation_guard = SolverParticipationGuard::new( - eth.clone(), - db.clone(), - settlement_updates_receiver, - args.db_based_solver_participation_guard, - ); let persistence = infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; let settlement_observer = crate::domain::settlement::Observer::new( @@ -583,6 +578,17 @@ pub async fn run(args: Arguments) { .into_iter() .collect(); + let solver_participation_guard = SolverParticipationGuard::new( + eth.clone(), + db.clone(), + settlement_updates_receiver, + args.db_based_solver_participation_guard, + drivers + .iter() + .map(|driver| (driver.submission_address, driver.clone())) + .collect::>(), + ); + let run = RunLoop::new( run_loop_config, eth,