Skip to content

Commit

Permalink
add gas_used column option to txs using receipts
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff committed Aug 1, 2023
1 parent 1a1fc35 commit af17ad5
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 27 deletions.
61 changes: 42 additions & 19 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use crate::{
with_series, with_series_binary,
};

pub(crate) type BlockTxGasTuple<TX> = Result<(Block<TX>, Option<Vec<u32>>), CollectError>;

#[async_trait::async_trait]
impl Dataset for Blocks {
fn datatype(&self) -> Datatype {
Expand Down Expand Up @@ -72,7 +74,7 @@ impl Dataset for Blocks {
async fn fetch_blocks(
block_chunk: &BlockChunk,
source: &Source,
) -> mpsc::Receiver<Result<Option<Block<TxHash>>, CollectError>> {
) -> mpsc::Receiver<BlockTxGasTuple<TxHash>> {
let (tx, rx) = mpsc::channel(block_chunk.numbers().len());

for number in block_chunk.numbers() {
Expand All @@ -88,8 +90,13 @@ async fn fetch_blocks(
if let Some(limiter) = rate_limiter {
Arc::clone(&limiter).until_ready().await;
}
let block = provider.get_block(number).await.map_err(CollectError::ProviderError);
match tx.send(block).await {
let block = provider.get_block(number).await;
let result = match block {
Ok(Some(block)) => Ok((block, None)),
Ok(None) => Err(CollectError::CollectError("block not in node".to_string())),
Err(e) => Err(CollectError::ProviderError(e)),
};
match tx.send(result).await {
Ok(_) => {}
Err(tokio::sync::mpsc::error::SendError(_e)) => {
eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests");
Expand All @@ -102,23 +109,23 @@ async fn fetch_blocks(
}

pub(crate) trait ProcessTransactions {
fn process(&self, schema: &Table, columns: &mut TransactionColumns);
fn process(&self, schema: &Table, columns: &mut TransactionColumns, gas_used: Option<u32>);
}

impl ProcessTransactions for TxHash {
fn process(&self, _schema: &Table, _columns: &mut TransactionColumns) {
fn process(&self, _schema: &Table, _columns: &mut TransactionColumns, _gas_used: Option<u32>) {
panic!("transaction data not available to process")
}
}

impl ProcessTransactions for Transaction {
fn process(&self, schema: &Table, columns: &mut TransactionColumns) {
process_transaction(self, schema, columns)
fn process(&self, schema: &Table, columns: &mut TransactionColumns, gas_used: Option<u32>) {
process_transaction(self, schema, columns, gas_used)
}
}

pub(crate) async fn blocks_to_dfs<TX: ProcessTransactions>(
mut blocks: mpsc::Receiver<Result<Option<Block<TX>>, CollectError>>,
mut blocks: mpsc::Receiver<BlockTxGasTuple<TX>>,
blocks_schema: &Option<&Table>,
transactions_schema: &Option<&Table>,
chain_id: u64,
Expand All @@ -137,26 +144,31 @@ pub(crate) async fn blocks_to_dfs<TX: ProcessTransactions>(
let mut n_txs = 0;
while let Some(message) = blocks.recv().await {
match message {
Ok(Some(block)) => {
Ok((block, gas_used)) => {
n_blocks += 1;
if let Some(schema) = blocks_schema {
process_block(&block, schema, &mut block_columns)
}
if let Some(schema) = transactions_schema {
for tx in block.transactions.iter() {
n_txs += 1;
tx.process(schema, &mut transaction_columns)
match gas_used {
Some(gas_used) => {
for (tx, gas_used) in block.transactions.iter().zip(gas_used) {
n_txs += 1;
tx.process(schema, &mut transaction_columns, Some(gas_used))
}
}
None => {
for tx in block.transactions.iter() {
n_txs += 1;
tx.process(schema, &mut transaction_columns, None)
}
}
}
}
}
// _ => return Err(CollectError::TooManyRequestsError),
Err(e) => {
println!("{:?}", e);
return Err(CollectError::TooManyRequestsError)
}
Ok(None) => {
println!("NONE");
return Err(CollectError::TooManyRequestsError)
return Err(CollectError::TooManyRequestsError);
}
}
}
Expand Down Expand Up @@ -250,6 +262,7 @@ pub(crate) struct TransactionColumns {
value: Vec<String>,
input: Vec<Vec<u8>>,
gas_limit: Vec<u32>,
gas_used: Vec<u32>,
gas_price: Vec<Option<u64>>,
transaction_type: Vec<Option<u32>>,
max_priority_fee_per_gas: Vec<Option<u64>>,
Expand All @@ -268,6 +281,7 @@ impl TransactionColumns {
value: Vec::with_capacity(n),
input: Vec::with_capacity(n),
gas_limit: Vec::with_capacity(n),
gas_used: Vec::with_capacity(n),
gas_price: Vec::with_capacity(n),
transaction_type: Vec::with_capacity(n),
max_priority_fee_per_gas: Vec::with_capacity(n),
Expand All @@ -291,6 +305,7 @@ impl TransactionColumns {
with_series!(cols, "value", self.value, schema);
with_series_binary!(cols, "input", self.input, schema);
with_series!(cols, "gas_limit", self.gas_limit, schema);
with_series!(cols, "gas_used", self.gas_used, schema);
with_series!(cols, "gas_price", self.gas_price, schema);
with_series!(cols, "transaction_type", self.transaction_type, schema);
with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema);
Expand Down Expand Up @@ -358,7 +373,12 @@ fn process_block<TX>(block: &Block<TX>, schema: &Table, columns: &mut BlockColum
}
}

fn process_transaction(tx: &Transaction, schema: &Table, columns: &mut TransactionColumns) {
fn process_transaction(
tx: &Transaction,
schema: &Table,
columns: &mut TransactionColumns,
gas_used: Option<u32>,
) {
if schema.has_column("block_number") {
match tx.block_number {
Some(block_number) => columns.block_number.push(Some(block_number.as_u64())),
Expand Down Expand Up @@ -397,6 +417,9 @@ fn process_transaction(tx: &Transaction, schema: &Table, columns: &mut Transacti
if schema.has_column("gas_limit") {
columns.gas_limit.push(tx.gas.as_u32());
}
if schema.has_column("gas_used") {
columns.gas_used.push(gas_used.unwrap())
}
if schema.has_column("gas_price") {
columns.gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64()));
}
Expand Down
81 changes: 74 additions & 7 deletions crates/freeze/src/datasets/blocks_and_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ impl MultiDataset for BlocksAndTransactions {
schemas: HashMap<Datatype, Table>,
_filter: HashMap<Datatype, RowFilter>,
) -> Result<HashMap<Datatype, DataFrame>, CollectError> {
let rx = fetch_blocks_and_transactions(chunk, source).await;
let include_gas_used = match &schemas.get(&Datatype::Transactions) {
Some(table) => table.has_column("gas_used"),
_ => false,
};
let rx = fetch_blocks_and_transactions(chunk, source, include_gas_used).await;
let output = blocks::blocks_to_dfs(
rx,
&schemas.get(&Datatype::Blocks),
Expand All @@ -51,25 +55,46 @@ impl MultiDataset for BlocksAndTransactions {
pub(crate) async fn fetch_blocks_and_transactions(
block_chunk: &BlockChunk,
source: &Source,
) -> mpsc::Receiver<Result<Option<Block<Transaction>>, CollectError>> {
include_gas_used: bool,
) -> mpsc::Receiver<blocks::BlockTxGasTuple<Transaction>> {
let (tx, rx) = mpsc::channel(block_chunk.numbers().len());
let source = Arc::new(source.clone());

for number in block_chunk.numbers() {
let tx = tx.clone();
let provider = source.provider.clone();
let semaphore = source.semaphore.clone();
let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone);
let source_arc = source.clone();
task::spawn(async move {
let _permit = match semaphore {
let permit = match semaphore {
Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await),
_ => None,
};
if let Some(limiter) = rate_limiter {
Arc::clone(&limiter).until_ready().await;
}
let block =
provider.get_block_with_txs(number).await.map_err(CollectError::ProviderError);
match tx.send(block).await {
};
let block_result = provider.get_block_with_txs(number).await;
drop(permit);

// get gas usage
let result = match block_result {
Ok(Some(block)) => {
if include_gas_used {
match get_txs_gas_used(&block, source_arc.clone()).await {
Ok(gas_used) => Ok((block, Some(gas_used))),
Err(e) => Err(e),
}
} else {
Ok((block, None))
}
}
Ok(None) => Err(CollectError::CollectError("no block found".into())),
Err(e) => Err(CollectError::ProviderError(e)),
};

// send to channel
match tx.send(result).await {
Ok(_) => {}
Err(tokio::sync::mpsc::error::SendError(_e)) => {
eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests");
Expand All @@ -80,3 +105,45 @@ pub(crate) async fn fetch_blocks_and_transactions(
}
rx
}

async fn get_txs_gas_used(
block: &Block<Transaction>,
source: Arc<Source>,
) -> Result<Vec<u32>, CollectError> {

let source = Arc::new(source);
let mut tasks = Vec::new();
for tx in &block.transactions {
let provider = source.provider.clone();
let semaphore = source.semaphore.clone();
let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone);
let tx_clone = tx.hash;
let task = task::spawn(async move {
let _permit = match semaphore {
Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await),
_ => None,
};
if let Some(limiter) = rate_limiter {
Arc::clone(&limiter).until_ready().await;
};
match provider.get_transaction_receipt(tx_clone).await {
Ok(Some(receipt)) => Ok(receipt.gas_used),
Ok(None) => Err(CollectError::CollectError("could not find tx receipt".to_string())),
Err(e) => Err(CollectError::ProviderError(e)),
}
});
tasks.push(task);
}

let mut gas_used: Vec<u32> = Vec::new();
for task in tasks {
match task.await {
Ok(Ok(Some(value))) => gas_used.push(value.as_u32()),
_ => {
return Err(CollectError::CollectError("gas_used not available from node".into()))
},
}
}

Ok(gas_used)
}
7 changes: 6 additions & 1 deletion crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl Dataset for Transactions {
("value_float", ColumnType::Float64),
("input", ColumnType::Binary),
("gas_limit", ColumnType::UInt32),
("gas_used", ColumnType::UInt32),
("gas_price", ColumnType::UInt64),
("transaction_type", ColumnType::UInt32),
("max_priority_fee_per_gas", ColumnType::UInt64),
Expand All @@ -49,6 +50,7 @@ impl Dataset for Transactions {
"value",
"input",
"gas_limit",
"gas_used",
"gas_price",
"transaction_type",
"max_priority_fee_per_gas",
Expand All @@ -68,7 +70,10 @@ impl Dataset for Transactions {
schema: &Table,
_filter: Option<&RowFilter>,
) -> Result<DataFrame, CollectError> {
let rx = blocks_and_transactions::fetch_blocks_and_transactions(chunk, source).await;
let include_gas_used = schema.has_column("gas_used");
let rx =
blocks_and_transactions::fetch_blocks_and_transactions(chunk, source, include_gas_used)
.await;
let output = blocks::blocks_to_dfs(rx, &None, &Some(schema), source.chain_id).await;
match output {
Ok((_, Some(txs_df))) => Ok(txs_df),
Expand Down
4 changes: 4 additions & 0 deletions crates/freeze/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub enum FreezeError {
/// Error related to data collection
#[derive(Error, Debug)]
pub enum CollectError {
/// General Collection error
#[error("Collect failed: {0}")]
CollectError(String),

/// Error related to provider operations
#[error("Failed to get block: {0}")]
ProviderError(#[source] ProviderError),
Expand Down

0 comments on commit af17ad5

Please sign in to comment.