Skip to content

Commit

Permalink
Ocean tries to catchup on startup if below tip height
Browse files Browse the repository at this point in the history
  • Loading branch information
Jouzo committed Oct 28, 2024
1 parent 54ea232 commit 5671c13
Show file tree
Hide file tree
Showing 13 changed files with 420 additions and 45 deletions.
4 changes: 2 additions & 2 deletions lib/ain-ocean/src/api/pool_pair/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,9 @@ async fn list_pool_swaps_verbose(
_ => true,
})
.map(|item| async {
let (_, swap) = item?;
let (key, swap) = item?;
let from = find_swap_from(&ctx, &swap).await?;
let to = find_swap_to(&ctx, &swap).await?;
let to = find_swap_to(&ctx, &key, &swap).await?;

let swap_type = check_swap_type(&ctx, &swap).await?;

Expand Down
16 changes: 14 additions & 2 deletions lib/ain-ocean/src/api/pool_pair/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
NotFoundKind, OtherSnafu,
},
indexer::PoolSwapAggregatedInterval,
model::{PoolSwap, PoolSwapAggregatedAggregated},
model::{PoolSwap, PoolSwapAggregatedAggregated, PoolSwapKey},
storage::{RepositoryOps, SecondaryIndex, SortOrder},
Result,
};
Expand Down Expand Up @@ -673,6 +673,7 @@ pub async fn find_swap_from(

pub async fn find_swap_to(
ctx: &Arc<AppContext>,
swap_key: &PoolSwapKey,
swap: &PoolSwap,
) -> Result<Option<PoolSwapFromToData>> {
let PoolSwap {
Expand All @@ -689,9 +690,20 @@ pub async fn find_swap_to(

let display_symbol = parse_display_symbol(&to_token);

// TODO Index to_amount if missing
if to_amount.is_none() {
let amount = 0;
let swap = PoolSwap {
to_amount: Some(amount),
..swap.clone()
};
ctx.services.pool.by_id.put(swap_key, &swap)?;
}

Ok(Some(PoolSwapFromToData {
address: to_address,
amount: Decimal::new(to_amount.to_owned(), 8).to_string(),
// amount: Decimal::new(to_amount.to_owned(), 8).to_string(), // Need fallback
amount: Decimal::new(to_amount.to_owned().unwrap_or_default(), 8).to_string(),
symbol: to_token.symbol,
display_symbol,
}))
Expand Down
14 changes: 12 additions & 2 deletions lib/ain-ocean/src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn get_bucket(block: &Block<Transaction>, interval: i64) -> i64 {
}

fn index_block_start(services: &Arc<Services>, block: &Block<Transaction>) -> Result<()> {
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
let mut pool_pairs = services.pool_pair_cache.get();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
Expand Down Expand Up @@ -116,7 +116,7 @@ fn index_block_start(services: &Arc<Services>, block: &Block<Transaction>) -> Re
}

fn invalidate_block_start(services: &Arc<Services>, block: &Block<Transaction>) -> Result<()> {
let mut pool_pairs = ain_cpp_imports::get_pool_pairs();
let mut pool_pairs = services.pool_pair_cache.get();
pool_pairs.sort_by(|a, b| b.creation_height.cmp(&a.creation_height));

for interval in AGGREGATED_INTERVALS {
Expand Down Expand Up @@ -601,6 +601,14 @@ fn invalidate_block_end(services: &Arc<Services>, block: &BlockContext) -> Resul
Ok(())
}

pub fn get_block_height(services: &Arc<Services>) -> Result<u32> {
Ok(services
.block
.by_height
.get_highest()?
.map_or(0, |block| block.height))
}

pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Result<()> {
trace!("[index_block] Indexing block...");
let start = Instant::now();
Expand Down Expand Up @@ -658,6 +666,7 @@ pub fn index_block(services: &Arc<Services>, block: Block<Transaction>) -> Resul
DfTx::SetLoanToken(data) => data.index(services, &ctx)?,
DfTx::CompositeSwap(data) => data.index(services, &ctx)?,
DfTx::PlaceAuctionBid(data) => data.index(services, &ctx)?,
DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(),
_ => (),
}
log_elapsed(start, "Indexed dftx");
Expand Down Expand Up @@ -755,6 +764,7 @@ pub fn invalidate_block(services: &Arc<Services>, block: Block<Transaction>) ->
DfTx::SetLoanToken(data) => data.invalidate(services, &ctx)?,
DfTx::CompositeSwap(data) => data.invalidate(services, &ctx)?,
DfTx::PlaceAuctionBid(data) => data.invalidate(services, &ctx)?,
DfTx::CreatePoolPair(_) => services.pool_pair_cache.invalidate(),
_ => (),
}
log_elapsed(start, "Invalidate dftx");
Expand Down
100 changes: 81 additions & 19 deletions lib/ain-ocean/src/indexer/poolswap.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::{str::FromStr, sync::Arc};

use ain_cpp_imports::PoolPairCreationHeight;
use ain_dftx::{pool::*, COIN};
use bitcoin::Txid;
use log::trace;
use parking_lot::RwLock;
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use snafu::OptionExt;

use super::Context;
use crate::{
error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu},
error::{ArithmeticOverflowSnafu, ArithmeticUnderflowSnafu, Error, NotFoundKind},
indexer::{tx_result, Index, Result},
model::{self, PoolSwapResult, TxResult},
storage::{RepositoryOps, SortOrder},
Expand Down Expand Up @@ -149,7 +151,7 @@ fn invalidate_swap_aggregated(

impl Index for PoolSwap {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()> {
trace!("[Poolswap] Indexing...");
trace!("[Poolswap] Indexing {self:?}...");
let txid = ctx.tx.txid;
let idx = ctx.tx_idx;
let from = self.from_script;
Expand All @@ -158,17 +160,28 @@ impl Index for PoolSwap {
let from_amount = self.from_amount;
let to_token_id = self.to_token_id.0;

let Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) =
services.result.get(&txid)?
else {
// TODO: Commenting out for now, fallback should only be introduced for supporting back CLI indexing
return Err("Missing swap result".into());
// let pair = find_pair(from_token_id, to_token_id);
// if pair.is_none() {
// return Err(format_err!("Pool not found by {from_token_id}-{to_token_id} or {to_token_id}-{from_token_id}").into());
// }
// let pair = pair.unwrap();
// (None, pair.id)
let (to_amount, pool_id) = match services.result.get(&txid)? {
Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) => {
(Some(to_amount), pool_id)
}
_ => {
let poolpairs = services.pool_pair_cache.get();

let pool_id = poolpairs
.into_iter()
.find(|pp| {
(pp.id_token_a == self.from_token_id.0 as u32
&& pp.id_token_b == self.to_token_id.0 as u32)
|| (pp.id_token_a == self.to_token_id.0 as u32
&& pp.id_token_b == self.from_token_id.0 as u32)
})
.map(|pp| pp.id)
.ok_or(Error::NotFound {
kind: NotFoundKind::PoolPair,
})?;

(None, pool_id)
}
};

let swap: model::PoolSwap = model::PoolSwap {
Expand Down Expand Up @@ -221,17 +234,31 @@ impl Index for PoolSwap {

impl Index for CompositeSwap {
fn index(self, services: &Arc<Services>, ctx: &Context) -> Result<()> {
trace!("[CompositeSwap] Indexing...");
trace!("[CompositeSwap] Indexing {self:?}...");
let txid = ctx.tx.txid;
let from_token_id = self.pool_swap.from_token_id.0;
let from_amount = self.pool_swap.from_amount;
let to_token_id = self.pool_swap.to_token_id.0;

let Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) =
services.result.get(&txid)?
else {
trace!("Missing swap result for {}", txid.to_string());
return Err("Missing swap result".into());
let (to_amount, pool_id) = match services.result.get(&txid)? {
Some(TxResult::PoolSwap(PoolSwapResult { to_amount, pool_id })) => {
(Some(to_amount), Some(pool_id))
}
_ => {
let poolpairs = services.pool_pair_cache.get();

let pool_id = poolpairs
.into_iter()
.find(|pp| {
(pp.id_token_a == self.pool_swap.from_token_id.0 as u32
&& pp.id_token_b == self.pool_swap.to_token_id.0 as u32)
|| (pp.id_token_a == self.pool_swap.to_token_id.0 as u32
&& pp.id_token_b == self.pool_swap.from_token_id.0 as u32)
})
.map(|pp| pp.id);

(None, pool_id)
}
};

let from = self.pool_swap.from_script;
Expand All @@ -240,6 +267,9 @@ impl Index for CompositeSwap {

let pool_ids = if pools.is_empty() {
// the pool_id from finals wap is the only swap while pools is empty
let pool_id = pool_id.ok_or(Error::NotFound {
kind: NotFoundKind::PoolPair,
})?;
Vec::from([pool_id])
} else {
pools.iter().map(|pool| pool.id.0 as u32).collect()
Expand Down Expand Up @@ -286,3 +316,35 @@ impl Index for CompositeSwap {
tx_result::invalidate(services, &ctx.tx.txid)
}
}
pub struct PoolPairCache {
cache: RwLock<Option<Vec<PoolPairCreationHeight>>>,
}

impl PoolPairCache {
pub fn new() -> Self {
Self {
cache: RwLock::new(None),
}
}

pub fn get(&self) -> Vec<PoolPairCreationHeight> {
{
let guard = self.cache.read();
if let Some(poolpairs) = guard.as_ref() {
return poolpairs.clone();
}
}

let poolpairs = ain_cpp_imports::get_pool_pairs();

let mut guard = self.cache.write();
*guard = Some(poolpairs.clone());

poolpairs
}

pub fn invalidate(&self) {
let mut guard = self.cache.write();
*guard = None;
}
}
5 changes: 4 additions & 1 deletion lib/ain-ocean/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use std::{path::PathBuf, sync::Arc};

pub use api::ocean_router;
use error::Error;
use indexer::poolswap::PoolPairCache;
pub use indexer::{
index_block, invalidate_block,
get_block_height, index_block, invalidate_block,
oracle::invalidate_oracle_interval,
transaction::{index_transaction, invalidate_transaction},
tx_result,
Expand Down Expand Up @@ -143,6 +144,7 @@ pub struct Services {
pub script_unspent: ScriptUnspentService,
pub token_graph: Arc<Mutex<UnGraphMap<u32, String>>>,
pub store: Arc<OceanStore>,
pub pool_pair_cache: PoolPairCache,
}

impl Services {
Expand Down Expand Up @@ -218,6 +220,7 @@ impl Services {
},
token_graph: Arc::new(Mutex::new(UnGraphMap::new())),
store: Arc::clone(&store),
pool_pair_cache: PoolPairCache::new(),
}
}
}
2 changes: 1 addition & 1 deletion lib/ain-ocean/src/model/poolswap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct PoolSwap {
pub pool_id: u32,
pub from_amount: i64,
pub from_token_id: u64,
pub to_amount: i64,
pub to_amount: Option<i64>,
pub to_token_id: u64,
pub from: ScriptBuf,
pub to: ScriptBuf,
Expand Down
1 change: 1 addition & 0 deletions lib/ain-rs-exports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ pub mod ffi {

fn evm_try_flush_db(result: &mut CrossBoundaryResult);

fn ocean_get_block_height(result: &mut CrossBoundaryResult) -> u32;
fn ocean_index_block(result: &mut CrossBoundaryResult, block_str: String);
fn ocean_invalidate_block(result: &mut CrossBoundaryResult, block: String);

Expand Down
5 changes: 5 additions & 0 deletions lib/ain-rs-exports/src/ocean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ use crate::{
prelude::{cross_boundary_error_return, cross_boundary_success_return},
};

#[ffi_fallible]
pub fn ocean_get_block_height() -> Result<u32> {
ain_ocean::get_block_height(&ain_ocean::SERVICES)
}

#[ffi_fallible]
pub fn ocean_index_block(block_str: String) -> Result<()> {
let block: Block<Transaction> = serde_json::from_str(&block_str)?;
Expand Down
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ DEFI_CORE_H = \
index/blockfilterindex.h \
index/txindex.h \
indirectmap.h \
ocean.h \
init.h \
interfaces/chain.h \
interfaces/handler.h \
Expand Down Expand Up @@ -410,6 +411,7 @@ libdefi_server_a_SOURCES = \
index/blockfilterindex.cpp \
index/txindex.cpp \
interfaces/chain.cpp \
ocean.cpp \
init.cpp \
dbwrapper.cpp \
ffi/ffiexports.cpp \
Expand Down
Loading

0 comments on commit 5671c13

Please sign in to comment.