Skip to content

Commit

Permalink
Trigger OnChainSettlementUpdates from event handling (#2387)
Browse files Browse the repository at this point in the history
# Description
Alternative to first doing #2335 

Our goal is to trigger the settlement event update handler from within
the main event handling (so we have control over the order of
execution).
At the same time "on settlement event updates" are not critical and more
time-consuming. They require a full RPC round trip for every settlement
they process, whereas the "critical" event updating (which is required
for new auctions to run correctly) can run without additional calls and
is therefore efficient for larger range (e.g. when catching up after a
downtime).

The goal of this PR is to bring the non-critical settlement updates into
the critical control flow, without making it itself blocking (ie the
critical code path just "schedules" execution of the non critical
updates).

# Changes
<!-- List of detailed changes (how the change is accomplished) -->

- [x] Change what triggers on settlement updates from being block based
to being `Notify` based
- [x] Move removal of non critical events from database event handling
to on settlement event handling
- [x] Trigger non critical updates non-blockingly from the critical code
path

## How to test
e2e tests

## Related Issues

Fixes #2275
  • Loading branch information
fleupold authored Feb 9, 2024
1 parent d66ff0b commit c4f6e51
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 105 deletions.
30 changes: 23 additions & 7 deletions crates/autopilot/src/boundary/events/settlement.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::database::Postgres,
crate::{database::Postgres, on_settlement_event_updater::OnSettlementEventUpdater},
anyhow::Result,
ethrpc::current_block::RangeInclusive,
shared::{event_handling::EventStoring, impl_event_retrieving},
Expand All @@ -11,33 +11,49 @@ impl_event_retrieving! {

pub struct Indexer {
db: Postgres,
settlement_updater: OnSettlementEventUpdater,
}

impl Indexer {
pub fn new(db: Postgres) -> Self {
Self { db }
pub fn new(db: Postgres, settlement_updater: OnSettlementEventUpdater) -> Self {
Self {
db,
settlement_updater,
}
}
}

#[async_trait::async_trait]
impl EventStoring<contracts::gpv2_settlement::Event> for Indexer {
async fn last_event_block(&self) -> Result<u64> {
let store: &dyn EventStoring<contracts::gpv2_settlement::Event> = &self.db;
store.last_event_block().await
let mut con = self.db.pool.acquire().await?;
crate::database::events::last_event_block(&mut con).await
}

async fn replace_events(
&mut self,
events: Vec<ethcontract::Event<contracts::gpv2_settlement::Event>>,
range: RangeInclusive<u64>,
) -> Result<()> {
self.db.replace_events(events, range).await
let mut transaction = self.db.pool.begin().await?;
let from_block = *range.start();
crate::database::events::replace_events(&mut transaction, events, from_block).await?;
OnSettlementEventUpdater::delete_observations(&mut transaction, from_block).await?;
transaction.commit().await?;

self.settlement_updater.schedule_update();
Ok(())
}

async fn append_events(
&mut self,
events: Vec<ethcontract::Event<contracts::gpv2_settlement::Event>>,
) -> Result<()> {
self.db.append_events(events).await
let mut transaction = self.db.pool.begin().await?;
crate::database::events::append_events(&mut transaction, events).await?;
transaction.commit().await?;

self.settlement_updater.schedule_update();
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/autopilot/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod auction;
pub mod auction_prices;
pub mod competition;
pub mod ethflow_events;
mod events;
pub mod events;
pub mod fee_policies;
pub mod on_settlement_event_updater;
pub mod onchain_order_events;
Expand Down
92 changes: 43 additions & 49 deletions crates/autopilot/src/database/events.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use {
super::Postgres,
anyhow::{anyhow, Context, Result},
contracts::gpv2_settlement::{
event_data::{
Expand All @@ -14,11 +13,11 @@ use {
byte_array::ByteArray,
events::{Event, EventIndex, Invalidation, PreSignature, Settlement, Trade},
OrderUid,
PgTransaction,
},
ethcontract::{Event as EthContractEvent, EventMetadata},
ethrpc::current_block::RangeInclusive,
number::conversions::u256_to_big_decimal,
shared::event_handling::EventStoring,
sqlx::PgConnection,
std::convert::TryInto,
};

Expand All @@ -44,57 +43,52 @@ pub fn contract_to_db_events(
.collect::<Result<Vec<_>>>()
}

#[async_trait::async_trait]
impl EventStoring<ContractEvent> for Postgres {
async fn last_event_block(&self) -> Result<u64> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["last_event_block"])
.start_timer();
pub async fn last_event_block(connection: &mut PgConnection) -> Result<u64> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["last_event_block"])
.start_timer();

let mut con = self.pool.acquire().await?;
let block_number = database::events::last_block(&mut con)
.await
.context("block_number_of_most_recent_event failed")?;
block_number.try_into().context("block number is negative")
}
let block_number = database::events::last_block(connection)
.await
.context("block_number_of_most_recent_event failed")?;
block_number.try_into().context("block number is negative")
}

async fn append_events(&mut self, events: Vec<EthContractEvent<ContractEvent>>) -> Result<()> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["append_events"])
.start_timer();
pub async fn append_events(
transaction: &mut PgTransaction<'_>,
events: Vec<EthContractEvent<ContractEvent>>,
) -> Result<()> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["append_events"])
.start_timer();

let events = contract_to_db_events(events)?;
let mut transaction = self.pool.begin().await?;
database::events::append(&mut transaction, &events)
.await
.context("append_events")?;
transaction.commit().await.context("commit")?;
Ok(())
}
let events = contract_to_db_events(events)?;
database::events::append(transaction, &events)
.await
.context("append_events")?;
Ok(())
}

async fn replace_events(
&mut self,
events: Vec<EthContractEvent<ContractEvent>>,
range: RangeInclusive<u64>,
) -> Result<()> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["replace_events"])
.start_timer();
pub async fn replace_events(
transaction: &mut PgTransaction<'_>,
events: Vec<EthContractEvent<ContractEvent>>,
from_block: u64,
) -> Result<()> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["replace_events"])
.start_timer();

let events = contract_to_db_events(events)?;
let mut transaction = self.pool.begin().await?;
database::events::delete(&mut transaction, *range.start() as i64)
.await
.context("delete_events failed")?;
database::events::append(&mut transaction, events.as_slice())
.await
.context("insert_events failed")?;
transaction.commit().await.context("commit")?;
Ok(())
}
let events = contract_to_db_events(events)?;
database::events::delete(transaction, from_block)
.await
.context("delete_events failed")?;
database::events::append(transaction, events.as_slice())
.await
.context("insert_events failed")?;
Ok(())
}

pub fn meta_to_event_index(meta: &EventMetadata) -> EventIndex {
Expand Down
63 changes: 47 additions & 16 deletions crates/autopilot/src/on_settlement_event_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,23 @@ use {
infra,
},
anyhow::{Context, Result},
futures::StreamExt,
database::PgTransaction,
primitive_types::H256,
shared::external_prices::ExternalPrices,
sqlx::PgConnection,
std::sync::Arc,
tokio::sync::Notify,
web3::types::Transaction,
};

pub struct OnSettlementEventUpdater {
pub eth: infra::Ethereum,
pub db: Postgres,
inner: Arc<Inner>,
}

struct Inner {
eth: infra::Ethereum,
db: Postgres,
notify: Notify,
}

enum AuctionIdRecoveryStatus {
Expand All @@ -59,30 +66,54 @@ enum AuctionIdRecoveryStatus {
}

impl OnSettlementEventUpdater {
pub async fn run_forever(self) -> ! {
let mut current_block = self.eth.current_block().borrow().to_owned();
let mut block_stream = ethrpc::current_block::into_stream(self.eth.current_block().clone());
/// Creates a new OnSettlementEventUpdater and asynchronously schedules the
/// first update run.
pub fn new(eth: infra::Ethereum, db: Postgres) -> Self {
let inner = Arc::new(Inner {
eth,
db,
notify: Notify::new(),
});
let inner_clone = inner.clone();
tokio::spawn(async move { Inner::listen_for_updates(inner_clone).await });
Self { inner }
}

/// Deletes settlement_observations and order executions for the given range
pub async fn delete_observations(
transaction: &mut PgTransaction<'_>,
from_block: u64,
) -> Result<()> {
database::settlements::delete(transaction, from_block)
.await
.context("delete_settlement_observations")?;

Ok(())
}

/// Schedules an update loop on a background thread
pub fn schedule_update(&self) {
self.inner.notify.notify_one();
}
}

impl Inner {
async fn listen_for_updates(self: Arc<Inner>) -> ! {
loop {
match self.update().await {
Ok(true) => {
tracing::debug!(
block = current_block.number,
"on settlement event updater ran and processed event"
);
// Don't wait until next block in case there are more pending events to process.
tracing::debug!("on settlement event updater ran and processed event");
// There might be more pending updates, continue immediately.
continue;
}
Ok(false) => {
tracing::debug!(
block = current_block.number,
"on settlement event updater ran without update"
);
tracing::debug!("on settlement event updater ran without update");
}
Err(err) => {
tracing::error!(?err, "on settlement event update task failed");
}
}
current_block = block_stream.next().await.expect("blockchains never end");
self.notify.notified().await;
}
}

Expand Down
16 changes: 4 additions & 12 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,14 @@ pub async fn run(args: Arguments) {
} else {
None
};

let on_settlement_event_updater =
crate::on_settlement_event_updater::OnSettlementEventUpdater::new(eth.clone(), db.clone());
let event_updater = Arc::new(EventUpdater::new(
boundary::events::settlement::GPv2SettlementContract::new(
eth.contracts().settlement().clone(),
),
boundary::events::settlement::Indexer::new(db.clone()),
boundary::events::settlement::Indexer::new(db.clone(), on_settlement_event_updater),
block_retriever.clone(),
skip_event_sync_start,
));
Expand Down Expand Up @@ -563,17 +566,6 @@ pub async fn run(args: Arguments) {
let liveness = Arc::new(Liveness::new(args.max_auction_age));
shared::metrics::serve_metrics(liveness.clone(), args.metrics_address);

let on_settlement_event_updater =
crate::on_settlement_event_updater::OnSettlementEventUpdater {
eth: eth.clone(),
db: db.clone(),
};
tokio::task::spawn(
on_settlement_event_updater
.run_forever()
.instrument(tracing::info_span!("on_settlement_event_updater")),
);

let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new(
args.order_events_cleanup_interval,
args.order_events_cleanup_threshold,
Expand Down
23 changes: 5 additions & 18 deletions crates/database/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,22 @@ pub async fn last_block(ex: &mut PgConnection) -> Result<i64, sqlx::Error> {

pub async fn delete(
ex: &mut PgTransaction<'_>,
delete_from_block_number: i64,
delete_from_block_number: u64,
) -> Result<(), sqlx::Error> {
const QUERY_INVALIDATION: &str = "DELETE FROM invalidations WHERE block_number >= $1;";
ex.execute(sqlx::query(QUERY_INVALIDATION).bind(delete_from_block_number))
ex.execute(sqlx::query(QUERY_INVALIDATION).bind(delete_from_block_number as i64))
.await?;

const QUERY_TRADE: &str = "DELETE FROM trades WHERE block_number >= $1;";
ex.execute(sqlx::query(QUERY_TRADE).bind(delete_from_block_number))
ex.execute(sqlx::query(QUERY_TRADE).bind(delete_from_block_number as i64))
.await?;

const QUERY_SETTLEMENTS: &str = "DELETE FROM settlements WHERE block_number >= $1;";
ex.execute(sqlx::query(QUERY_SETTLEMENTS).bind(delete_from_block_number))
ex.execute(sqlx::query(QUERY_SETTLEMENTS).bind(delete_from_block_number as i64))
.await?;

const QUERY_PRESIGNATURES: &str = "DELETE FROM presignature_events WHERE block_number >= $1;";
ex.execute(sqlx::query(QUERY_PRESIGNATURES).bind(delete_from_block_number))
.await?;

// Observations and order executions are not events but data derived from the
// onchain data. The reason we delete them here is that we want to keep the
// database state consistent.

const QUERY_OBSERVATIONS: &str =
"DELETE FROM settlement_observations WHERE block_number >= $1;";
ex.execute(sqlx::query(QUERY_OBSERVATIONS).bind(delete_from_block_number))
.await?;

const QUERY_ORDER_EXECUTIONS: &str = "DELETE FROM order_execution WHERE block_number >= $1;";
ex.execute(sqlx::query(QUERY_ORDER_EXECUTIONS).bind(delete_from_block_number))
ex.execute(sqlx::query(QUERY_PRESIGNATURES).bind(delete_from_block_number as i64))
.await?;

Ok(())
Expand Down
20 changes: 18 additions & 2 deletions crates/database/src/settlements.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{events::EventIndex, TransactionHash},
sqlx::PgConnection,
crate::{events::EventIndex, PgTransaction, TransactionHash},
sqlx::{Executor, PgConnection},
std::ops::Range,
};

Expand Down Expand Up @@ -92,6 +92,22 @@ WHERE block_number = $2 AND log_index = $3
.map(|_| ())
}

pub async fn delete(
ex: &mut PgTransaction<'_>,
delete_from_block_number: u64,
) -> Result<(), sqlx::Error> {
const QUERY_OBSERVATIONS: &str =
"DELETE FROM settlement_observations WHERE block_number >= $1;";
ex.execute(sqlx::query(QUERY_OBSERVATIONS).bind(delete_from_block_number as i64))
.await?;

const QUERY_ORDER_EXECUTIONS: &str = "DELETE FROM order_execution WHERE block_number >= $1;";
ex.execute(sqlx::query(QUERY_ORDER_EXECUTIONS).bind(delete_from_block_number as i64))
.await?;

Ok(())
}

#[cfg(test)]
mod tests {
use {
Expand Down

0 comments on commit c4f6e51

Please sign in to comment.