Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Driver fetches order's app-data #3242

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
246 changes: 215 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions crates/driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ hyper = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
itertools = { workspace = true }
mimalloc = { workspace = true }
moka = { version = "0.12.10", features = ["future"] }
num = { workspace = true }
number = { path = "../number" }
prometheus = { workspace = true }
Expand Down
84 changes: 68 additions & 16 deletions crates/driver/src/domain/competition/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
util::{self, Bytes},
},
chrono::{Duration, Utc},
futures::future::{join_all, BoxFuture, FutureExt, Shared},
futures::future::{join, join_all, BoxFuture, FutureExt, Shared},
itertools::Itertools,
model::{order::OrderKind, signature::Signature},
shared::signature_validator::{Contracts, SignatureValidating},
Expand Down Expand Up @@ -127,7 +127,10 @@ impl Auction {
}

#[derive(Clone)]
pub struct AuctionProcessor(Arc<Mutex<Inner>>);
pub struct AuctionProcessor {
inner: Arc<Mutex<Inner>>,
app_data_retriever: Arc<order::app_data::AppDataRetriever>,
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
}

struct Inner {
auction: auction::Id,
Expand All @@ -143,15 +146,60 @@ type BalanceGroup = (order::Trader, eth::TokenAddress, order::SellTokenBalance);
type Balances = HashMap<BalanceGroup, order::SellAmount>;

impl AuctionProcessor {
/// Prioritize well priced and filter out unfillable orders from the given
/// auction.
pub async fn prioritize(&self, auction: Auction, solver: &eth::H160) -> Auction {
/// Process the auction by prioritizing the orders and filtering out
/// unfillable orders as well as those that failed to fetch app data.
/// Fetches full app data for each order and returns an auction with
/// updated orders.
pub async fn process(&self, auction: Auction, solver: &eth::H160) -> Auction {
let (mut app_data_by_order, mut prioritized_orders) = join(
self.collect_orders_app_data(&auction),
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
self.prioritize_orders(&auction, solver),
)
.await;

// Filter out orders that failed to fetch app data.
prioritized_orders.retain_mut(|order| {
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
app_data_by_order.remove(&order.uid).map_or(true, |result| {
match result {
Err(err) => {
tracing::warn!(order_uid=?order.uid, ?err, "failed to fetch app data for order, excluding from auction");
false
}
Ok(Some(app_data)) => {
order.app_data = app_data.into();
true
}
Ok(None) => true
}
})
});

Auction {
orders: self.prioritize_orders(&auction, solver).await,
orders: prioritized_orders,
..auction
}
}

/// Fetches the app data for all orders in the auction.
/// Returns a map from order UIDs to the result of fetching the app data.
async fn collect_orders_app_data(
&self,
auction: &Auction,
) -> HashMap<
order::Uid,
Result<Option<app_data::ValidatedAppData>, order::app_data::FetchingError>,
> {
join_all(auction.orders.iter().map(|order| async {
let result = self.app_data_retriever.get(order.app_data.hash()).await;
(order.uid, result)
}))
.await
.into_iter()
.collect::<HashMap<_, _>>()
}

/// Prioritize well priced and filter out unfillable orders from the given
/// auction.
fn prioritize_orders(
&self,
auction: &Auction,
Expand All @@ -161,7 +209,7 @@ impl AuctionProcessor {
.id()
.expect("auctions used for quoting do not have to be prioritized");

let mut lock = self.0.lock().unwrap();
let mut lock = self.inner.lock().unwrap();
let current_id = lock.auction;
if new_id.0 < current_id.0 {
tracing::error!(?current_id, ?new_id, "received an outdated auction");
Expand Down Expand Up @@ -367,7 +415,7 @@ impl AuctionProcessor {
(*cow_amm.address(), cow_amm.validated_template_order(prices, signature_validator, &domain_separator).await)
}),
)
.await;
.await;

// Convert results to domain format.
let domain_separator =
Expand All @@ -392,7 +440,7 @@ impl AuctionProcessor {
},
kind: order::Kind::Limit,
side: template.order.kind.into(),
app_data: order::AppData(Bytes(template.order.app_data.0)),
app_data: order::AppDataHash(Bytes(template.order.app_data.0)).into(),
buy_token_balance: template.order.buy_token_balance.into(),
sell_token_balance: template.order.sell_token_balance.into(),
partial: match template.order.partially_fillable {
Expand Down Expand Up @@ -449,6 +497,7 @@ impl AuctionProcessor {
pub fn new(
eth: &infra::Ethereum,
order_priority_strategies: Vec<OrderPriorityStrategy>,
app_data_retriever: Arc<order::app_data::AppDataRetriever>,
) -> Self {
let eth = eth.with_metric_label("auctionPreProcessing".into());
let mut order_sorting_strategies = vec![];
Expand Down Expand Up @@ -478,13 +527,16 @@ impl AuctionProcessor {
},
);

Self(Arc::new(Mutex::new(Inner {
auction: Id(0),
fut: futures::future::pending().boxed().shared(),
eth,
order_sorting_strategies,
signature_validator,
})))
Self {
inner: Arc::new(Mutex::new(Inner {
auction: Id(0),
fut: futures::future::pending().boxed().shared(),
eth,
order_sorting_strategies,
signature_validator,
})),
app_data_retriever,
}
}
}

Expand Down
127 changes: 127 additions & 0 deletions crates/driver/src/domain/competition/order/app_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use {
futures::FutureExt,
moka::future::Cache,
reqwest::StatusCode,
shared::request_sharing::BoxRequestSharing,
std::sync::Arc,
thiserror::Error,
url::Url,
};

/// A struct for retrieving order's full app-data by its hash from a remote
/// service, with support for caching and deduplicating concurrent requests.
///
/// Ensures efficient access to application data by:
/// - Caching results to avoid redundant network requests.
/// - Sharing ongoing requests to prevent duplicate fetches for the same
/// `app_data`.
/// - Validating fetched app data.
///
/// LRU cache is used since only ~2% of app-data is unique across all orders
/// meaning that the cache hit rate is expected to be high, so there is no need
/// for TTL cache.
#[derive(Clone)]
pub struct AppDataRetriever(Arc<Inner>);

struct Inner {
client: reqwest::Client,
base_url: Url,
request_sharing: BoxRequestSharing<
super::AppDataHash,
Result<Option<app_data::ValidatedAppData>, FetchingError>,
>,
app_data_validator: app_data::Validator,
cache: Cache<super::AppDataHash, Option<app_data::ValidatedAppData>>,
}

impl AppDataRetriever {
// According to statistics, the average size of the app-data is ~800 bytes. With
// this constant, the approximate size of the cache will be ~1.6 MB.
const CACHE_SIZE: u64 = 2_000;
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved

pub fn new(orderbook_url: Url) -> Self {
Self(Arc::new(Inner {
client: reqwest::Client::new(),
base_url: orderbook_url,
request_sharing: BoxRequestSharing::labelled("app_data".to_string()),
app_data_validator: app_data::Validator::new(usize::MAX),
cache: Cache::new(Self::CACHE_SIZE),
}))
}

/// Retrieves the full app-data for the given `app_data` hash, if exists.
pub async fn get(
&self,
app_data: super::AppDataHash,
) -> Result<Option<app_data::ValidatedAppData>, FetchingError> {
if let Some(app_data) = self.0.cache.get(&app_data).await {
return Ok(app_data.clone());
}

let app_data_fut = move |app_data: &super::AppDataHash| {
let app_data = *app_data;
let self_ = self.clone();

async move {
let url = self_
.0
.base_url
.join(&format!("v1/app_data/{:?}", app_data.0))?;
mstrug marked this conversation as resolved.
Show resolved Hide resolved
let response = self_.0.client.get(url).send().await?;
let validated_app_data = match response.status() {
StatusCode::NOT_FOUND => None,
_ => {
let bytes = response.error_for_status()?.bytes().await?;
Some(self_.0.app_data_validator.validate(&bytes)?)
}
};

self_
.0
.cache
.insert(app_data, validated_app_data.clone())
.await;

Ok(validated_app_data)
}
.boxed()
};

self.0
.request_sharing
.shared_or_else(app_data, app_data_fut)
.await
}
}

#[derive(Error, Debug)]
pub enum FetchingError {
#[error("error while sending a request: {0}")]
Http(String),
#[error("received invalid app data: {0}")]
InvalidAppData(#[from] anyhow::Error),
#[error("internal error: {0}")]
Internal(String),
}

impl From<reqwest::Error> for FetchingError {
fn from(err: reqwest::Error) -> Self {
FetchingError::Http(err.to_string())
}
}

impl From<url::ParseError> for FetchingError {
fn from(err: url::ParseError) -> Self {
FetchingError::Internal(err.to_string())
}
}

impl Clone for FetchingError {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required to satisfy BoxRequestSharing constraints.

fn clone(&self) -> Self {
match self {
Self::Http(message) => Self::Http(message.clone()),
Self::InvalidAppData(err) => Self::InvalidAppData(shared::clone_anyhow_error(err)),
Self::Internal(message) => Self::Internal(message.clone()),
}
}
}
49 changes: 43 additions & 6 deletions crates/driver/src/domain/competition/order/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use {
};
pub use {fees::FeePolicy, signature::Signature};

pub mod app_data;
pub mod fees;
pub mod signature;

Expand Down Expand Up @@ -50,6 +51,42 @@ pub struct Order {
pub quote: Option<Quote>,
}

/// The app data associated with an order.
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug, Clone, From)]
pub enum AppData {
/// App data hash.
Hash(AppDataHash),
/// Validated full app data.
Full(Box<::app_data::ValidatedAppData>),
}

impl Default for AppData {
fn default() -> Self {
Self::Hash(Default::default())
}
}

impl AppData {
pub fn hash(&self) -> AppDataHash {
match self {
Self::Hash(hash) => *hash,
Self::Full(data) => AppDataHash(data.hash.0.into()),
}
}
}

impl From<[u8; APP_DATA_LEN]> for AppData {
fn from(app_data_hash: [u8; APP_DATA_LEN]) -> Self {
Self::Hash(app_data_hash.into())
}
}

impl From<::app_data::ValidatedAppData> for AppData {
fn from(value: ::app_data::ValidatedAppData) -> Self {
Self::Full(Box::new(value))
}
}

/// An amount denominated in the sell token of an [`Order`].
#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd, From, Into)]
pub struct SellAmount(pub eth::U256);
Expand Down Expand Up @@ -273,17 +310,17 @@ pub const APP_DATA_LEN: usize = 32;
/// This is a hash allowing arbitrary user data to be associated with an order.
/// While this type holds the hash, the data itself is uploaded to IPFS. This
/// hash is signed along with the order.
#[derive(Debug, Default, Clone, Copy)]
pub struct AppData(pub Bytes<[u8; APP_DATA_LEN]>);
#[derive(Debug, Default, Clone, Copy, Hash, PartialEq, Eq)]
pub struct AppDataHash(pub Bytes<[u8; APP_DATA_LEN]>);

impl From<[u8; APP_DATA_LEN]> for AppData {
impl From<[u8; APP_DATA_LEN]> for AppDataHash {
fn from(inner: [u8; APP_DATA_LEN]) -> Self {
Self(inner.into())
}
}

impl From<AppData> for [u8; APP_DATA_LEN] {
fn from(app_data: AppData) -> Self {
impl From<AppDataHash> for [u8; APP_DATA_LEN] {
fn from(app_data: AppDataHash) -> Self {
app_data.0.into()
}
}
Expand Down Expand Up @@ -388,7 +425,7 @@ pub struct Jit {
pub receiver: eth::Address,
pub valid_to: util::Timestamp,
pub partially_fillable: bool,
pub app_data: AppData,
pub app_data: AppDataHash,
pub side: Side,
pub sell_token_balance: SellTokenBalance,
pub buy_token_balance: BuyTokenBalance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub fn tx(
sell_amount: trade.order().sell.amount.into(),
buy_amount: trade.order().buy.amount.into(),
valid_to: trade.order().valid_to.into(),
app_data: trade.order().app_data.0 .0.into(),
app_data: trade.order().app_data.hash().0 .0.into(),
fee_amount: eth::U256::zero(),
flags: Flags {
side: trade.order().side,
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/domain/competition/solution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Solution {
.unwrap_or(u32::MIN)
.into(),
valid_to: jit.order().valid_to,
app_data: jit.order().app_data,
app_data: jit.order().app_data.into(),
partial: jit.order().partially_fillable(),
pre_interactions: vec![],
post_interactions: vec![],
Expand Down
Loading
Loading