From 95c1fc87f34608a0906d24f5a65e32beef6638a6 Mon Sep 17 00:00:00 2001 From: Tarrence van As Date: Fri, 2 Jun 2023 22:01:21 -0400 Subject: [PATCH] Extract indexing engine commit-id:bf2373d1 --- crates/torii/src/cli.rs | 4 +- crates/torii/src/engine.rs | 161 ++++++++++++++++++++++++++++++++++++ crates/torii/src/indexer.rs | 128 +--------------------------- 3 files changed, 167 insertions(+), 126 deletions(-) create mode 100644 crates/torii/src/engine.rs diff --git a/crates/torii/src/cli.rs b/crates/torii/src/cli.rs index f751ac97fc..158586f729 100644 --- a/crates/torii/src/cli.rs +++ b/crates/torii/src/cli.rs @@ -12,10 +12,10 @@ use url::Url; use crate::indexer::start_indexer; -mod processors; - +mod engine; mod graphql; mod indexer; +mod processors; mod storage; mod tests; diff --git a/crates/torii/src/engine.rs b/crates/torii/src/engine.rs new file mode 100644 index 0000000000..3bc75ddb15 --- /dev/null +++ b/crates/torii/src/engine.rs @@ -0,0 +1,161 @@ +use std::error::Error; +use std::sync::Arc; +use std::time::Duration; + +use starknet::core::types::{ + BlockId, BlockWithTxs, Event, InvokeTransaction, MaybePendingBlockWithTxs, + MaybePendingTransactionReceipt, StarknetError, Transaction, TransactionReceipt, +}; +use starknet::providers::jsonrpc::{JsonRpcClient, JsonRpcTransport}; +use starknet::providers::{Provider, ProviderError}; +use tokio::time::sleep; +use tracing::error; + +use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; +use crate::storage::Storage; + +pub struct Processors { + block: Vec>>, + transaction: Vec>>, + event: Vec>>, +} + +impl Default for Processors { + fn default() -> Self { + Self { block: vec![], transaction: vec![], event: vec![] } + } +} + +pub struct Engine<'a, S: Storage, T: JsonRpcTransport + Sync + Send> { + storage: &'a S, + provider: &'a JsonRpcClient, + processors: Processors, +} + +impl<'a, S: Storage, T: JsonRpcTransport + Sync + Send> Engine<'a, S, T> { + pub fn new( + storage: &'a S, + provider: &'a JsonRpcClient, + processors: Processors, + ) -> Self { + Self { storage, provider, processors } + } + + pub async fn start(&self) -> Result<(), Box> { + let mut current_block_number = self.storage.head().await?; + + loop { + sleep(Duration::from_secs(1)).await; + + let block_with_txs = + match self.provider.get_block_with_txs(BlockId::Number(current_block_number)).await + { + Ok(block_with_txs) => block_with_txs, + Err(e) => { + if let ProviderError::StarknetError(StarknetError::BlockNotFound) = e { + continue; + } + + error!("getting block: {}", e); + continue; + } + }; + + let block_with_txs = match block_with_txs { + MaybePendingBlockWithTxs::Block(block_with_txs) => block_with_txs, + _ => continue, + }; + + process_block(self.storage, self.provider, &self.processors.block, &block_with_txs) + .await?; + + for transaction in block_with_txs.transactions { + let invoke_transaction = match &transaction { + Transaction::Invoke(invoke_transaction) => invoke_transaction, + _ => continue, + }; + + let invoke_transaction = match invoke_transaction { + InvokeTransaction::V1(invoke_transaction) => invoke_transaction, + _ => continue, + }; + + let receipt = match self + .provider + .get_transaction_receipt(invoke_transaction.transaction_hash) + .await + { + Ok(receipt) => receipt, + _ => continue, + }; + + let receipt = match receipt { + MaybePendingTransactionReceipt::Receipt(receipt) => receipt, + _ => continue, + }; + + process_transaction( + self.storage, + self.provider, + &self.processors.transaction, + &receipt.clone(), + ) + .await?; + + if let TransactionReceipt::Invoke(invoke_receipt) = receipt.clone() { + for event in &invoke_receipt.events { + process_event( + self.storage, + self.provider, + &self.processors.event, + &receipt, + event, + ) + .await?; + } + } + } + + current_block_number += 1; + } + } +} + +async fn process_block( + storage: &S, + provider: &JsonRpcClient, + processors: &[Arc>], + block: &BlockWithTxs, +) -> Result<(), Box> { + for processor in processors { + processor.process(storage, provider, block).await?; + } + Ok(()) +} + +async fn process_transaction( + storage: &S, + provider: &JsonRpcClient, + processors: &[Arc>], + receipt: &TransactionReceipt, +) -> Result<(), Box> { + for processor in processors { + processor.process(storage, provider, receipt).await?; + } + + Ok(()) +} + +async fn process_event( + storage: &S, + provider: &JsonRpcClient, + processors: &[Arc>], + _receipt: &TransactionReceipt, + event: &Event, +) -> Result<(), Box> { + for processor in processors { + processor.process(storage, provider, event).await?; + } + + Ok(()) +} diff --git a/crates/torii/src/indexer.rs b/crates/torii/src/indexer.rs index a32b5355d1..6afc5bf2d4 100644 --- a/crates/torii/src/indexer.rs +++ b/crates/torii/src/indexer.rs @@ -1,22 +1,14 @@ use std::error::Error; -use std::sync::Arc; -use std::time::Duration; use num::BigUint; -use starknet::core::types::{ - BlockId, BlockWithTxs, Event, InvokeTransaction, MaybePendingBlockWithTxs, - MaybePendingTransactionReceipt, StarknetError, Transaction, TransactionReceipt, -}; use starknet::providers::jsonrpc::{JsonRpcClient, JsonRpcTransport}; -use starknet::providers::{Provider, ProviderError}; -use tokio::time::sleep; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::info; +use crate::engine::{Engine, Processors}; // use crate::processors::component_register::ComponentRegistrationProcessor; // use crate::processors::component_state_update::ComponentStateUpdateProcessor; // use crate::processors::system_register::SystemRegistrationProcessor; -use crate::processors::{BlockProcessor, EventProcessor, TransactionProcessor}; use crate::storage::Storage; pub async fn start_indexer( @@ -27,120 +19,8 @@ pub async fn start_indexer( ) -> Result<(), Box> { info!("starting indexer"); - let block_processors: Vec>> = vec![]; - let transaction_processors: Vec>> = vec![]; - let event_processors: Vec>> = vec![]; + let engine = Engine::new(storage, provider, Processors::default()); + engine.start().await?; - let mut current_block_number = storage.head().await?; - - loop { - sleep(Duration::from_secs(1)).await; - - let block_with_txs = - match provider.get_block_with_txs(BlockId::Number(current_block_number)).await { - Ok(block_with_txs) => block_with_txs, - Err(e) => { - if let ProviderError::StarknetError(StarknetError::BlockNotFound) = e { - continue; - } - - error!("getting block: {}", e); - continue; - } - }; - - let block_with_txs = match block_with_txs { - MaybePendingBlockWithTxs::Block(block_with_txs) => block_with_txs, - _ => continue, - }; - - process_block(storage, provider, &block_processors, &block_with_txs).await?; - - for transaction in block_with_txs.transactions { - let invoke_transaction = match &transaction { - Transaction::Invoke(invoke_transaction) => invoke_transaction, - _ => continue, - }; - - let invoke_transaction = match invoke_transaction { - InvokeTransaction::V1(invoke_transaction) => invoke_transaction, - _ => continue, - }; - - let receipt = - match provider.get_transaction_receipt(invoke_transaction.transaction_hash).await { - Ok(receipt) => receipt, - _ => continue, - }; - - let receipt = match receipt { - MaybePendingTransactionReceipt::Receipt(receipt) => receipt, - _ => continue, - }; - - process_transaction(storage, provider, &transaction_processors, &receipt.clone()) - .await?; - - if let TransactionReceipt::Invoke(invoke_receipt) = receipt.clone() { - for event in &invoke_receipt.events { - process_event(storage, provider, &event_processors, &receipt, event).await?; - } - } - } - - current_block_number += 1; - } -} - -async fn process_block( - storage: &S, - provider: &JsonRpcClient, - processors: &[Arc>], - block: &BlockWithTxs, -) -> Result<(), Box> { - for processor in processors { - processor.process(storage, provider, block).await?; - } Ok(()) } - -async fn process_transaction( - storage: &S, - provider: &JsonRpcClient, - processors: &[Arc>], - receipt: &TransactionReceipt, -) -> Result<(), Box> { - for processor in processors { - processor.process(storage, provider, receipt).await?; - } - - Ok(()) -} - -async fn process_event( - storage: &S, - provider: &JsonRpcClient, - processors: &[Arc>], - _receipt: &TransactionReceipt, - event: &Event, -) -> Result<(), Box> { - for processor in processors { - processor.process(storage, provider, event).await?; - } - - Ok(()) -} - -// #[test] -// fn test_indexer() { -// use crate::start_apibara; - -// let rpc_url = "http://localhost:5050"; -// let (sequencer, rpc) = build_mock_rpc(5050); -// let ct = CancellationToken::new(); -// let pool = sqlx::sqlite::SqlitePool::connect("sqlite::memory:").unwrap(); -// let world = BigUint::from(0x1234567890); -// let provider = JsonRpcClient::new(HttpTransport::new(Uri::parse(rpc_url))); - -// start_indexer(ct, world, Uri::from_str("http://localhost:7171").unwrap(), pool, &provider) -// }