Skip to content

Commit

Permalink
Notify external solvers
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz committed Jan 30, 2025
1 parent 321f9bc commit f69e174
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 20 deletions.
47 changes: 42 additions & 5 deletions crates/autopilot/src/domain/competition/participation_guard/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use {
crate::{
database::Postgres,
domain::{eth, Metrics},
infra::Driver,
infra,
},
ethrpc::block_stream::CurrentBlockWatcher,
futures::future::join_all,
std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
},
Expand All @@ -21,6 +23,7 @@ struct Inner {
banned_solvers: dashmap::DashMap<eth::Address, Instant>,
ttl: Duration,
last_auctions_count: u32,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
}

impl Validator {
Expand All @@ -30,12 +33,14 @@ impl Validator {
settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
ttl: Duration,
last_auctions_count: u32,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
) -> Self {
let self_ = Self(Arc::new(Inner {
db,
banned_solvers: Default::default(),
ttl,
last_auctions_count,
drivers_by_address,
}));

self_.start_maintenance(settlement_updates_receiver, current_block);
Expand All @@ -50,13 +55,14 @@ impl Validator {
mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
current_block: CurrentBlockWatcher,
) {
let self_ = self.0.clone();
let self_ = self.clone();
tokio::spawn(async move {
while settlement_updates_receiver.recv().await.is_some() {
let current_block = current_block.borrow().number;
match self_
.0
.db
.find_non_settling_solvers(self_.last_auctions_count, current_block)
.find_non_settling_solvers(self_.0.last_auctions_count, current_block)
.await
{
Ok(non_settling_solvers) => {
Expand All @@ -74,10 +80,11 @@ impl Validator {
.collect::<Vec<_>>();

tracing::debug!(?non_settling_solvers, "found non-settling solvers");
self_.notify_solvers(&non_settling_solvers);

let now = Instant::now();
for solver in non_settling_solvers {
self_.banned_solvers.insert(solver, now);
self_.0.banned_solvers.insert(solver, now);
}
}
Err(err) => {
Expand All @@ -87,11 +94,41 @@ impl Validator {
}
});
}

/// Try to notify all the non-settling external solvers.
fn notify_solvers(&self, non_settling_solvers: &[eth::Address]) {
let futures = non_settling_solvers
.iter()
.cloned()
.map(|solver| {
let self_ = self.0.clone();
async move {
match self_.drivers_by_address.get(&solver) {
Some(driver) => {
if let Err(err) = driver
.notify(&infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions)
.await
{
tracing::debug!(?solver, ?err, "unable to notify external solver");
}
}
None => {
tracing::error!(?solver, "found unrecognized non-settling driver");
}
}
}
}
).collect::<Vec<_>>();

tokio::spawn(async move {
join_all(futures).await;
});
}
}

#[async_trait::async_trait]
impl super::Validator for Validator {
async fn is_allowed(&self, driver: &Driver) -> anyhow::Result<bool> {
async fn is_allowed(&self, driver: &infra::Driver) -> anyhow::Result<bool> {
// Check if solver accepted this feature. This should be removed once a CIP is
// approved.
if !driver.accepts_unsettled_blocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use {
crate::{
arguments::DbBasedSolverParticipationGuardConfig,
database::Postgres,
infra::{Driver, Ethereum},
domain::eth,
infra,
},
std::sync::Arc,
std::{collections::HashMap, sync::Arc},
};

/// This struct checks whether a solver can participate in the competition by
Expand All @@ -22,10 +23,11 @@ struct Inner {

impl SolverParticipationGuard {
pub fn new(
eth: Ethereum,
eth: infra::Ethereum,
db: Postgres,
settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>,
db_based_validator_config: DbBasedSolverParticipationGuardConfig,
drivers_by_address: HashMap<eth::Address, Arc<infra::Driver>>,
) -> Self {
let mut validators: Vec<Box<dyn Validator + Send + Sync>> = Vec::new();

Expand All @@ -37,6 +39,7 @@ impl SolverParticipationGuard {
settlement_updates_receiver,
db_based_validator_config.solver_blacklist_cache_ttl,
db_based_validator_config.solver_last_auctions_participation_count,
drivers_by_address,
);
validators.push(Box::new(database_solver_participation_validator));
}
Expand All @@ -52,7 +55,7 @@ impl SolverParticipationGuard {
/// the following order:
/// 1. DB-based validator: operates fast since it uses in-memory cache.
/// 2. Onchain-based validator: only then calls the Authenticator contract.
pub async fn can_participate(&self, driver: &Driver) -> anyhow::Result<bool> {
pub async fn can_participate(&self, driver: &infra::Driver) -> anyhow::Result<bool> {
for validator in &self.0.validators {
if !validator.is_allowed(driver).await? {
return Ok(false);
Expand All @@ -65,5 +68,5 @@ impl SolverParticipationGuard {

#[async_trait::async_trait]
trait Validator: Send + Sync {
async fn is_allowed(&self, driver: &Driver) -> anyhow::Result<bool>;
async fn is_allowed(&self, driver: &infra::Driver) -> anyhow::Result<bool>;
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::infra::{Driver, Ethereum};
use crate::infra;

/// Calls Authenticator contract to check if a solver has a sufficient
/// permission.
pub(super) struct Validator {
pub eth: Ethereum,
pub eth: infra::Ethereum,
}

#[async_trait::async_trait]
impl super::Validator for Validator {
async fn is_allowed(&self, solver: &Driver) -> anyhow::Result<bool> {
async fn is_allowed(&self, solver: &infra::Driver) -> anyhow::Result<bool> {
Ok(self
.eth
.contracts()
Expand Down
1 change: 1 addition & 0 deletions crates/autopilot/src/infra/solvers/dto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Types for communicating with drivers as defined in
//! `crates/driver/openapi.yml`.
pub mod notify;
pub mod reveal;
pub mod settle;
pub mod solve;
9 changes: 9 additions & 0 deletions crates/autopilot/src/infra/solvers/dto/notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use {serde::Serialize, serde_with::serde_as};

#[serde_as]
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub enum Request {
/// The driver won multiple consecutive auctions but never settled them.
UnsettledConsecutiveAuctions,
}
6 changes: 5 additions & 1 deletion crates/autopilot/src/infra/solvers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
self::dto::{reveal, settle, solve},
crate::{arguments::Account, domain::eth, util},
crate::{arguments::Account, domain::eth, infra::solvers::dto::notify, util},
anyhow::{anyhow, Context, Result},
reqwest::{Client, StatusCode},
std::time::Duration,
Expand Down Expand Up @@ -116,6 +116,10 @@ impl Driver {
Ok(())
}

pub async fn notify(&self, request: &notify::Request) -> Result<()> {
self.request_response("notify", request, None).await
}

async fn request_response<Response>(
&self,
path: &str,
Expand Down
18 changes: 12 additions & 6 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use {
token_list::{AutoUpdatingTokenList, TokenListConfiguration},
},
std::{
collections::HashMap,
sync::{Arc, RwLock},
time::{Duration, Instant},
},
Expand Down Expand Up @@ -368,12 +369,6 @@ pub async fn run(args: Arguments) {
let (settlement_updates_sender, settlement_updates_receiver) =
tokio::sync::mpsc::unbounded_channel();

let solver_participation_guard = SolverParticipationGuard::new(
eth.clone(),
db.clone(),
settlement_updates_receiver,
args.db_based_solver_participation_guard,
);
let persistence =
infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await;
let settlement_observer = crate::domain::settlement::Observer::new(
Expand Down Expand Up @@ -583,6 +578,17 @@ pub async fn run(args: Arguments) {
.into_iter()
.collect();

let solver_participation_guard = SolverParticipationGuard::new(
eth.clone(),
db.clone(),
settlement_updates_receiver,
args.db_based_solver_participation_guard,
drivers
.iter()
.map(|driver| (driver.submission_address, driver.clone()))
.collect::<HashMap<_, _>>(),
);

let run = RunLoop::new(
run_loop_config,
eth,
Expand Down

0 comments on commit f69e174

Please sign in to comment.