From af17ad5ac901a6d40367fc487e93dad42e4e512e Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Mon, 31 Jul 2023 23:47:12 -0700 Subject: [PATCH] add gas_used column option to txs using receipts --- crates/freeze/src/datasets/blocks.rs | 61 +++++++++----- .../src/datasets/blocks_and_transactions.rs | 81 +++++++++++++++++-- crates/freeze/src/datasets/transactions.rs | 7 +- crates/freeze/src/types/errors.rs | 4 + 4 files changed, 126 insertions(+), 27 deletions(-) diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index 6b12b152..b95f4fc5 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -13,6 +13,8 @@ use crate::{ with_series, with_series_binary, }; +pub(crate) type BlockTxGasTuple = Result<(Block, Option>), CollectError>; + #[async_trait::async_trait] impl Dataset for Blocks { fn datatype(&self) -> Datatype { @@ -72,7 +74,7 @@ impl Dataset for Blocks { async fn fetch_blocks( block_chunk: &BlockChunk, source: &Source, -) -> mpsc::Receiver>, CollectError>> { +) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); for number in block_chunk.numbers() { @@ -88,8 +90,13 @@ async fn fetch_blocks( if let Some(limiter) = rate_limiter { Arc::clone(&limiter).until_ready().await; } - let block = provider.get_block(number).await.map_err(CollectError::ProviderError); - match tx.send(block).await { + let block = provider.get_block(number).await; + let result = match block { + Ok(Some(block)) => Ok((block, None)), + Ok(None) => Err(CollectError::CollectError("block not in node".to_string())), + Err(e) => Err(CollectError::ProviderError(e)), + }; + match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); @@ -102,23 +109,23 @@ async fn fetch_blocks( } pub(crate) trait ProcessTransactions { - fn process(&self, schema: &Table, columns: &mut TransactionColumns); + fn process(&self, schema: &Table, columns: &mut TransactionColumns, gas_used: Option); } impl ProcessTransactions for TxHash { - fn process(&self, _schema: &Table, _columns: &mut TransactionColumns) { + fn process(&self, _schema: &Table, _columns: &mut TransactionColumns, _gas_used: Option) { panic!("transaction data not available to process") } } impl ProcessTransactions for Transaction { - fn process(&self, schema: &Table, columns: &mut TransactionColumns) { - process_transaction(self, schema, columns) + fn process(&self, schema: &Table, columns: &mut TransactionColumns, gas_used: Option) { + process_transaction(self, schema, columns, gas_used) } } pub(crate) async fn blocks_to_dfs( - mut blocks: mpsc::Receiver>, CollectError>>, + mut blocks: mpsc::Receiver>, blocks_schema: &Option<&Table>, transactions_schema: &Option<&Table>, chain_id: u64, @@ -137,26 +144,31 @@ pub(crate) async fn blocks_to_dfs( let mut n_txs = 0; while let Some(message) = blocks.recv().await { match message { - Ok(Some(block)) => { + Ok((block, gas_used)) => { n_blocks += 1; if let Some(schema) = blocks_schema { process_block(&block, schema, &mut block_columns) } if let Some(schema) = transactions_schema { - for tx in block.transactions.iter() { - n_txs += 1; - tx.process(schema, &mut transaction_columns) + match gas_used { + Some(gas_used) => { + for (tx, gas_used) in block.transactions.iter().zip(gas_used) { + n_txs += 1; + tx.process(schema, &mut transaction_columns, Some(gas_used)) + } + } + None => { + for tx in block.transactions.iter() { + n_txs += 1; + tx.process(schema, &mut transaction_columns, None) + } + } } } } - // _ => return Err(CollectError::TooManyRequestsError), Err(e) => { println!("{:?}", e); - return Err(CollectError::TooManyRequestsError) - } - Ok(None) => { - println!("NONE"); - return Err(CollectError::TooManyRequestsError) + return Err(CollectError::TooManyRequestsError); } } } @@ -250,6 +262,7 @@ pub(crate) struct TransactionColumns { value: Vec, input: Vec>, gas_limit: Vec, + gas_used: Vec, gas_price: Vec>, transaction_type: Vec>, max_priority_fee_per_gas: Vec>, @@ -268,6 +281,7 @@ impl TransactionColumns { value: Vec::with_capacity(n), input: Vec::with_capacity(n), gas_limit: Vec::with_capacity(n), + gas_used: Vec::with_capacity(n), gas_price: Vec::with_capacity(n), transaction_type: Vec::with_capacity(n), max_priority_fee_per_gas: Vec::with_capacity(n), @@ -291,6 +305,7 @@ impl TransactionColumns { with_series!(cols, "value", self.value, schema); with_series_binary!(cols, "input", self.input, schema); with_series!(cols, "gas_limit", self.gas_limit, schema); + with_series!(cols, "gas_used", self.gas_used, schema); with_series!(cols, "gas_price", self.gas_price, schema); with_series!(cols, "transaction_type", self.transaction_type, schema); with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema); @@ -358,7 +373,12 @@ fn process_block(block: &Block, schema: &Table, columns: &mut BlockColum } } -fn process_transaction(tx: &Transaction, schema: &Table, columns: &mut TransactionColumns) { +fn process_transaction( + tx: &Transaction, + schema: &Table, + columns: &mut TransactionColumns, + gas_used: Option, +) { if schema.has_column("block_number") { match tx.block_number { Some(block_number) => columns.block_number.push(Some(block_number.as_u64())), @@ -397,6 +417,9 @@ fn process_transaction(tx: &Transaction, schema: &Table, columns: &mut Transacti if schema.has_column("gas_limit") { columns.gas_limit.push(tx.gas.as_u32()); } + if schema.has_column("gas_used") { + columns.gas_used.push(gas_used.unwrap()) + } if schema.has_column("gas_price") { columns.gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64())); } diff --git a/crates/freeze/src/datasets/blocks_and_transactions.rs b/crates/freeze/src/datasets/blocks_and_transactions.rs index c884c675..39ce711a 100644 --- a/crates/freeze/src/datasets/blocks_and_transactions.rs +++ b/crates/freeze/src/datasets/blocks_and_transactions.rs @@ -27,7 +27,11 @@ impl MultiDataset for BlocksAndTransactions { schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { - let rx = fetch_blocks_and_transactions(chunk, source).await; + let include_gas_used = match &schemas.get(&Datatype::Transactions) { + Some(table) => table.has_column("gas_used"), + _ => false, + }; + let rx = fetch_blocks_and_transactions(chunk, source, include_gas_used).await; let output = blocks::blocks_to_dfs( rx, &schemas.get(&Datatype::Blocks), @@ -51,25 +55,46 @@ impl MultiDataset for BlocksAndTransactions { pub(crate) async fn fetch_blocks_and_transactions( block_chunk: &BlockChunk, source: &Source, -) -> mpsc::Receiver>, CollectError>> { + include_gas_used: bool, +) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(block_chunk.numbers().len()); + let source = Arc::new(source.clone()); for number in block_chunk.numbers() { let tx = tx.clone(); let provider = source.provider.clone(); let semaphore = source.semaphore.clone(); let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let source_arc = source.clone(); task::spawn(async move { - let _permit = match semaphore { + let permit = match semaphore { Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), _ => None, }; if let Some(limiter) = rate_limiter { Arc::clone(&limiter).until_ready().await; - } - let block = - provider.get_block_with_txs(number).await.map_err(CollectError::ProviderError); - match tx.send(block).await { + }; + let block_result = provider.get_block_with_txs(number).await; + drop(permit); + + // get gas usage + let result = match block_result { + Ok(Some(block)) => { + if include_gas_used { + match get_txs_gas_used(&block, source_arc.clone()).await { + Ok(gas_used) => Ok((block, Some(gas_used))), + Err(e) => Err(e), + } + } else { + Ok((block, None)) + } + } + Ok(None) => Err(CollectError::CollectError("no block found".into())), + Err(e) => Err(CollectError::ProviderError(e)), + }; + + // send to channel + match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); @@ -80,3 +105,45 @@ pub(crate) async fn fetch_blocks_and_transactions( } rx } + +async fn get_txs_gas_used( + block: &Block, + source: Arc, +) -> Result, CollectError> { + + let source = Arc::new(source); + let mut tasks = Vec::new(); + for tx in &block.transactions { + let provider = source.provider.clone(); + let semaphore = source.semaphore.clone(); + let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let tx_clone = tx.hash; + let task = task::spawn(async move { + let _permit = match semaphore { + Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), + _ => None, + }; + if let Some(limiter) = rate_limiter { + Arc::clone(&limiter).until_ready().await; + }; + match provider.get_transaction_receipt(tx_clone).await { + Ok(Some(receipt)) => Ok(receipt.gas_used), + Ok(None) => Err(CollectError::CollectError("could not find tx receipt".to_string())), + Err(e) => Err(CollectError::ProviderError(e)), + } + }); + tasks.push(task); + } + + let mut gas_used: Vec = Vec::new(); + for task in tasks { + match task.await { + Ok(Ok(Some(value))) => gas_used.push(value.as_u32()), + _ => { + return Err(CollectError::CollectError("gas_used not available from node".into())) + }, + } + } + + Ok(gas_used) +} diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index e20fad02..dbde50c5 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -30,6 +30,7 @@ impl Dataset for Transactions { ("value_float", ColumnType::Float64), ("input", ColumnType::Binary), ("gas_limit", ColumnType::UInt32), + ("gas_used", ColumnType::UInt32), ("gas_price", ColumnType::UInt64), ("transaction_type", ColumnType::UInt32), ("max_priority_fee_per_gas", ColumnType::UInt64), @@ -49,6 +50,7 @@ impl Dataset for Transactions { "value", "input", "gas_limit", + "gas_used", "gas_price", "transaction_type", "max_priority_fee_per_gas", @@ -68,7 +70,10 @@ impl Dataset for Transactions { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = blocks_and_transactions::fetch_blocks_and_transactions(chunk, source).await; + let include_gas_used = schema.has_column("gas_used"); + let rx = + blocks_and_transactions::fetch_blocks_and_transactions(chunk, source, include_gas_used) + .await; let output = blocks::blocks_to_dfs(rx, &None, &Some(schema), source.chain_id).await; match output { Ok((_, Some(txs_df))) => Ok(txs_df), diff --git a/crates/freeze/src/types/errors.rs b/crates/freeze/src/types/errors.rs index bc542328..423687cd 100644 --- a/crates/freeze/src/types/errors.rs +++ b/crates/freeze/src/types/errors.rs @@ -30,6 +30,10 @@ pub enum FreezeError { /// Error related to data collection #[derive(Error, Debug)] pub enum CollectError { + /// General Collection error + #[error("Collect failed: {0}")] + CollectError(String), + /// Error related to provider operations #[error("Failed to get block: {0}")] ProviderError(#[source] ProviderError),