From 57e245e6aa38c863e6171bb1795f1f2ded7114ac Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 5 Sep 2023 08:12:03 +0200 Subject: [PATCH 1/3] Move semaphore and rate-limit to Fetcher. --- crates/cli/src/parse/source.rs | 11 ++-- crates/freeze/src/datasets/blocks.rs | 40 +++--------- crates/freeze/src/datasets/state_diffs.rs | 40 +++--------- crates/freeze/src/types/sources.rs | 75 ++++++++++++++++++++--- 4 files changed, 93 insertions(+), 73 deletions(-) diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index e52475f2..fe808c9e 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -5,7 +5,7 @@ use governor::{Quota, RateLimiter}; use polars::prelude::*; use std::num::NonZeroU32; -use cryo_freeze::{ParseError, Source}; +use cryo_freeze::{ParseError, Fetcher, Source}; use crate::args::Args; @@ -34,11 +34,14 @@ pub(crate) async fn parse_source(args: &Args) -> Result { let semaphore = tokio::sync::Semaphore::new(max_concurrent_requests as usize); let semaphore = Some(Arc::new(semaphore)); - let output = Source { - provider: Arc::new(provider), - chain_id, + let fetcher = Fetcher { + provider, semaphore, rate_limiter, + }; + let output = Source { + fetcher: Arc::new(fetcher), + chain_id, inner_request_size: args.inner_request_size, max_concurrent_chunks, }; diff --git a/crates/freeze/src/datasets/blocks.rs b/crates/freeze/src/datasets/blocks.rs index 22853482..d8e462be 100644 --- a/crates/freeze/src/datasets/blocks.rs +++ b/crates/freeze/src/datasets/blocks.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use ethers::prelude::*; use polars::prelude::*; @@ -97,29 +97,18 @@ async fn fetch_tx_block_numbers( ) -> Result, CollectError> { let mut tasks = Vec::new(); for tx_hash in tx_hashes { - let provider = Arc::clone(&source.provider); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); let tx_hash = tx_hash.clone(); - let task = tokio::task::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - }; - provider.get_transaction(H256::from_slice(&tx_hash)).await - }); + let task = + tokio::task::spawn( + async move { fetcher.get_transaction(H256::from_slice(&tx_hash)).await }, + ); tasks.push(task); } let results = futures::future::join_all(tasks).await; let mut block_numbers = Vec::new(); for res in results { - let task_result = - res.map_err(|_| CollectError::CollectError("Task join error".to_string()))?; - let tx = - task_result.map_err(|_| CollectError::CollectError("Provider error".to_string()))?; + let tx = res.map_err(|_| CollectError::CollectError("Task join error".to_string()))??; match tx { Some(transaction) => match transaction.block_number { Some(block_number) => block_numbers.push(block_number.as_u64()), @@ -143,22 +132,13 @@ async fn fetch_blocks( for number in block_chunk.numbers() { let tx = tx.clone(); - let provider = Arc::clone(&source.provider); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); task::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - } - let block = provider.get_block(number).await; + let block = fetcher.get_block(number).await; let result = match block { Ok(Some(block)) => Ok((block, None)), Ok(None) => Err(CollectError::CollectError("block not in node".to_string())), - Err(e) => Err(CollectError::ProviderError(e)), + Err(e) => Err(e), }; match tx.send(result).await { Ok(_) => {} diff --git a/crates/freeze/src/datasets/state_diffs.rs b/crates/freeze/src/datasets/state_diffs.rs index 5dfb1958..aebfab1f 100644 --- a/crates/freeze/src/datasets/state_diffs.rs +++ b/crates/freeze/src/datasets/state_diffs.rs @@ -85,10 +85,11 @@ pub(crate) async fn collect_transaction_state_diffs( _filter: Option<&RowFilter>, ) -> Result { let include_indices = schema.has_column("block_number"); + let chain_id = source.chain_id; let rx = fetch_transaction_state_diffs(chunk, source, include_indices).await; let mut schemas: HashMap = HashMap::new(); schemas.insert(*datatype, schema.clone()); - let dfs = state_diffs_to_df(rx, &schemas, source.chain_id).await; + let dfs = state_diffs_to_df(rx, &schemas, chain_id).await; // get single df out of result let df = match dfs { @@ -113,19 +114,10 @@ pub(crate) async fn fetch_block_traces( continue }; let tx = tx.clone(); - let provider = source.provider.clone(); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); let trace_types = trace_types.to_vec(); tokio::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - } - let result = provider + let result = fetcher .trace_replay_block_transactions(BlockNumber::Number(number.into()), trace_types) .await .map(|res| { @@ -136,8 +128,7 @@ pub(crate) async fn fetch_block_traces( .map(|(index, traces)| (index as u32, traces)) .collect(), ) - }) - .map_err(CollectError::ProviderError); + }); match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { @@ -163,31 +154,16 @@ pub(crate) async fn fetch_transaction_traces( for tx_hash in tx_hashes.iter() { let tx_hash = tx_hash.clone(); let tx = tx.clone(); - let provider = source.provider.clone(); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); let trace_types = trace_types.to_vec(); tokio::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = &rate_limiter { - Arc::clone(limiter).until_ready().await; - } let tx_hash = H256::from_slice(&tx_hash); - let result = provider - .trace_replay_transaction(tx_hash, trace_types) - .await - .map_err(CollectError::ProviderError); + let result = fetcher.trace_replay_transaction(tx_hash, trace_types).await; let result = match result { Ok(trace) => { let trace = BlockTrace { transaction_hash: Some(tx_hash), ..trace }; if include_indices { - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - }; - match provider.get_transaction(tx_hash).await { + match fetcher.get_transaction(tx_hash).await { Ok(Some(tx)) => match (tx.block_number, tx.transaction_index) { (Some(block_number), Some(tx_index)) => Ok(( Some(block_number.as_u32()), diff --git a/crates/freeze/src/types/sources.rs b/crates/freeze/src/types/sources.rs index 9ac414b7..a221b6eb 100644 --- a/crates/freeze/src/types/sources.rs +++ b/crates/freeze/src/types/sources.rs @@ -6,7 +6,9 @@ use governor::{ middleware::NoOpMiddleware, state::{direct::NotKeyed, InMemoryState}, }; -use tokio::sync::Semaphore; +use tokio::sync::{AcquireError, Semaphore, SemaphorePermit}; + +use crate::CollectError; /// RateLimiter based on governor crate pub type RateLimiter = governor::RateLimiter; @@ -14,12 +16,7 @@ pub type RateLimiter = governor::RateLimiter>, - /// semaphore for controlling concurrency - pub semaphore: Option>, - /// rate limiter for controlling request rate - pub rate_limiter: Option>, + pub fetcher: Arc, /// chain_id of network pub chain_id: u64, /// number of blocks per log request @@ -28,6 +25,70 @@ pub struct Source { pub max_concurrent_chunks: u64, } +pub struct Fetcher { + /// provider data source + pub provider: Provider, + /// semaphore for controlling concurrency + pub semaphore: Option, + /// rate limiter for controlling request rate + pub rate_limiter: Option, +} + +type Result = ::core::result::Result; + +impl Fetcher { + async fn permit_request( + &self, + ) -> Option<::core::result::Result, AcquireError>> { + let permit = match &self.semaphore { + Some(semaphore) => Some(semaphore.acquire().await), + _ => None, + }; + if let Some(limiter) = &self.rate_limiter { + limiter.until_ready().await; + } + permit + } + + pub async fn get_logs(&self, filter: &Filter) -> Result> { + let _permit = self.permit_request().await; + self.provider.get_logs(filter).await.map_err(CollectError::ProviderError) + } + + pub async fn trace_replay_block_transactions( + &self, + block: BlockNumber, + trace_types: Vec, + ) -> Result> { + let _permit = self.permit_request().await; + self.provider + .trace_replay_block_transactions(block, trace_types) + .await + .map_err(CollectError::ProviderError) + } + + pub async fn trace_replay_transaction( + &self, + block: H256, + trace_types: Vec, + ) -> Result { + let _permit = self.permit_request().await; + self.provider + .trace_replay_transaction(block, trace_types) + .await + .map_err(CollectError::ProviderError) + } + + pub async fn get_transaction(&self, tx_hash: H256) -> Result> { + let _permit = self.permit_request().await; + self.provider.get_transaction(tx_hash).await.map_err(CollectError::ProviderError) + } + + pub async fn get_block(&self, block_num: u64) -> Result>> { + self.provider.get_block(block_num).await.map_err(CollectError::ProviderError) + } +} + // impl Source { // /// create Source for an individual chunk // pub fn build_source(&self) -> Source { From 736fa47eb730fd330203e697094c6011e4d9d0d6 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 5 Sep 2023 12:00:50 +0200 Subject: [PATCH 2/3] Factor out Fetcher to centralize concurrency checks. --- crates/cli/src/parse/args.rs | 2 +- crates/cli/src/parse/blocks.rs | 125 ++++++++---------- crates/cli/src/parse/query.rs | 11 +- crates/cli/src/parse/source.rs | 12 +- .../src/datasets/blocks_and_transactions.rs | 85 ++++-------- crates/freeze/src/datasets/contracts.rs | 4 +- crates/freeze/src/datasets/logs.rs | 38 +----- .../freeze/src/datasets/native_transfers.rs | 4 +- crates/freeze/src/datasets/traces.rs | 44 ++---- crates/freeze/src/datasets/transactions.rs | 24 +--- crates/freeze/src/types/mod.rs | 2 +- crates/freeze/src/types/sources.rs | 102 ++++++++++---- 12 files changed, 197 insertions(+), 256 deletions(-) diff --git a/crates/cli/src/parse/args.rs b/crates/cli/src/parse/args.rs index 49425e67..1276aa9a 100644 --- a/crates/cli/src/parse/args.rs +++ b/crates/cli/src/parse/args.rs @@ -9,7 +9,7 @@ use super::{file_output, query, source}; /// parse options for running freeze pub async fn parse_opts(args: &Args) -> Result<(MultiQuery, Source, FileOutput), ParseError> { let source = source::parse_source(args).await?; - let query = query::parse_query(args, Arc::clone(&source.provider)).await?; + let query = query::parse_query(args, Arc::clone(&source.fetcher)).await?; let sink = file_output::parse_file_output(args, &source)?; Ok((query, source, sink)) } diff --git a/crates/cli/src/parse/blocks.rs b/crates/cli/src/parse/blocks.rs index bbe77b89..65b4c49d 100644 --- a/crates/cli/src/parse/blocks.rs +++ b/crates/cli/src/parse/blocks.rs @@ -1,13 +1,13 @@ use ethers::prelude::*; use polars::prelude::*; -use cryo_freeze::{BlockChunk, Chunk, ChunkData, ParseError, Subchunk}; +use cryo_freeze::{BlockChunk, Chunk, ChunkData, Fetcher, ParseError, Subchunk}; use crate::args::Args; -pub(crate) async fn parse_blocks( +pub(crate) async fn parse_blocks( args: &Args, - provider: Arc>, + fetcher: Arc>, ) -> Result)>, ParseError> { let (files, explicit_numbers): (Vec<&String>, Vec<&String>) = match &args.blocks { Some(blocks) => blocks.iter().partition(|tx| std::path::Path::new(tx).exists()), @@ -42,10 +42,10 @@ pub(crate) async fn parse_blocks( // parse inputs into BlockChunks let mut block_chunks = Vec::new(); for explicit_number in explicit_numbers { - let outputs = parse_block_inputs(explicit_number, &provider).await?; + let outputs = parse_block_inputs(explicit_number, &fetcher).await?; block_chunks.extend(outputs); } - postprocess_block_chunks(block_chunks, args, provider).await? + postprocess_block_chunks(block_chunks, args, fetcher).await? } else { Vec::new() }; @@ -92,10 +92,10 @@ fn read_integer_column(path: &str, column: &str) -> Result, ParseError> } } -async fn postprocess_block_chunks( +async fn postprocess_block_chunks( block_chunks: Vec, args: &Args, - provider: Arc>, + fetcher: Arc>, ) -> Result)>, ParseError> { // align let block_chunks = if args.align { @@ -111,7 +111,7 @@ async fn postprocess_block_chunks( }; // apply reorg buffer - let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &provider).await?; + let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &fetcher).await?; // put into Chunk enums let chunks: Vec<(Chunk, Option)> = @@ -120,34 +120,31 @@ async fn postprocess_block_chunks( Ok(chunks) } -pub(crate) async fn get_default_block_chunks( +pub(crate) async fn get_default_block_chunks( args: &Args, - provider: Arc>, + fetcher: Arc>, ) -> Result)>, ParseError> { - let block_chunks = parse_block_inputs(&String::from(r"0:latest"), &provider).await?; - postprocess_block_chunks(block_chunks, args, provider).await + let block_chunks = parse_block_inputs(&String::from(r"0:latest"), &fetcher).await?; + postprocess_block_chunks(block_chunks, args, fetcher).await } /// parse block numbers to freeze -async fn parse_block_inputs

( +async fn parse_block_inputs( inputs: &str, - provider: &Provider

, -) -> Result, ParseError> -where - P: JsonRpcClient, -{ + fetcher: &Fetcher

, +) -> Result, ParseError> { let parts: Vec<&str> = inputs.split(' ').collect(); match parts.len() { 1 => { let first_input = parts.first().ok_or_else(|| { ParseError::ParseError("Failed to get the first input".to_string()) })?; - parse_block_token(first_input, true, provider).await.map(|x| vec![x]) + parse_block_token(first_input, true, fetcher).await.map(|x| vec![x]) } _ => { let mut chunks = Vec::new(); for part in parts { - chunks.push(parse_block_token(part, false, provider).await?); + chunks.push(parse_block_token(part, false, fetcher).await?); } Ok(chunks) } @@ -160,27 +157,24 @@ enum RangePosition { None, } -async fn parse_block_token

( +async fn parse_block_token( s: &str, as_range: bool, - provider: &Provider

, -) -> Result -where - P: JsonRpcClient, -{ + fetcher: &Fetcher

, +) -> Result { let s = s.replace('_', ""); let parts: Vec<&str> = s.split(':').collect(); match parts.as_slice() { [block_ref] => { - let block = parse_block_number(block_ref, RangePosition::None, provider).await?; + let block = parse_block_number(block_ref, RangePosition::None, fetcher).await?; Ok(BlockChunk::Numbers(vec![block])) } [first_ref, second_ref] => { let (start_block, end_block) = match (first_ref, second_ref) { _ if first_ref.starts_with('-') => { let end_block = - parse_block_number(second_ref, RangePosition::Last, provider).await?; + parse_block_number(second_ref, RangePosition::Last, fetcher).await?; let start_block = end_block .checked_sub(first_ref[1..].parse::().map_err(|_e| { ParseError::ParseError("start_block parse error".to_string()) @@ -192,7 +186,7 @@ where } _ if second_ref.starts_with('+') => { let start_block = - parse_block_number(first_ref, RangePosition::First, provider).await?; + parse_block_number(first_ref, RangePosition::First, fetcher).await?; let end_block = start_block .checked_add(second_ref[1..].parse::().map_err(|_e| { ParseError::ParseError("start_block parse error".to_string()) @@ -202,9 +196,9 @@ where } _ => { let start_block = - parse_block_number(first_ref, RangePosition::First, provider).await?; + parse_block_number(first_ref, RangePosition::First, fetcher).await?; let end_block = - parse_block_number(second_ref, RangePosition::Last, provider).await?; + parse_block_number(second_ref, RangePosition::Last, fetcher).await?; (start_block, end_block) } }; @@ -225,21 +219,18 @@ where } } -async fn parse_block_number

( +async fn parse_block_number( block_ref: &str, range_position: RangePosition, - provider: &Provider

, -) -> Result -where - P: JsonRpcClient, -{ + fetcher: &Fetcher

, +) -> Result { match (block_ref, range_position) { - ("latest", _) => provider.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| { + ("latest", _) => fetcher.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| { ParseError::ParseError("Error retrieving latest block number".to_string()) }), ("", RangePosition::First) => Ok(0), ("", RangePosition::Last) => { - provider.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| { + fetcher.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| { ParseError::ParseError("Error retrieving last block number".to_string()) }) } @@ -269,15 +260,15 @@ where } } -async fn apply_reorg_buffer( +async fn apply_reorg_buffer( block_chunks: Vec, reorg_filter: u64, - provider: &Provider, + fetcher: &Fetcher

, ) -> Result, ParseError> { match reorg_filter { 0 => Ok(block_chunks), reorg_filter => { - let latest_block = match provider.get_block_number().await { + let latest_block = match fetcher.get_block_number().await { Ok(result) => result.as_u64(), Err(_e) => { return Err(ParseError::ParseError("reorg buffer parse error".to_string())) @@ -306,30 +297,28 @@ mod tests { async fn block_token_test_helper(tests: Vec<(BlockTokenTest<'_>, bool)>) { let (provider, mock) = Provider::mocked(); + let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None }; for (test, res) in tests { match test { BlockTokenTest::WithMock((token, expected, latest)) => { mock.push(U64::from(latest)).unwrap(); - assert_eq!(block_token_test_executor(token, expected, &provider).await, res); + assert_eq!(block_token_test_executor(token, expected, &fetcher).await, res); } BlockTokenTest::WithoutMock((token, expected)) => { - assert_eq!(block_token_test_executor(token, expected, &provider).await, res); + assert_eq!(block_token_test_executor(token, expected, &fetcher).await, res); } } } } - async fn block_token_test_executor

( + async fn block_token_test_executor( token: &str, expected: BlockChunk, - provider: &Provider

, - ) -> bool - where - P: JsonRpcClient, - { + fetcher: &Fetcher

, + ) -> bool { match expected { BlockChunk::Numbers(expected_block_numbers) => { - let block_chunks = parse_block_token(token, false, provider).await.unwrap(); + let block_chunks = parse_block_token(token, false, fetcher).await.unwrap(); assert!(matches!(block_chunks, BlockChunk::Numbers { .. })); let BlockChunk::Numbers(block_numbers) = block_chunks else { panic!("Unexpected shape") @@ -337,7 +326,7 @@ mod tests { block_numbers == expected_block_numbers } BlockChunk::Range(expected_range_start, expected_range_end) => { - let block_chunks = parse_block_token(token, true, provider).await.unwrap(); + let block_chunks = parse_block_token(token, true, fetcher).await.unwrap(); assert!(matches!(block_chunks, BlockChunk::Range { .. })); let BlockChunk::Range(range_start, range_end) = block_chunks else { panic!("Unexpected shape") @@ -354,28 +343,26 @@ mod tests { async fn block_input_test_helper(tests: Vec<(BlockInputTest<'_>, bool)>) { let (provider, mock) = Provider::mocked(); + let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None }; for (test, res) in tests { match test { BlockInputTest::WithMock((inputs, expected, latest)) => { mock.push(U64::from(latest)).unwrap(); - assert_eq!(block_input_test_executor(inputs, expected, &provider).await, res); + assert_eq!(block_input_test_executor(inputs, expected, &fetcher).await, res); } BlockInputTest::WithoutMock((inputs, expected)) => { - assert_eq!(block_input_test_executor(inputs, expected, &provider).await, res); + assert_eq!(block_input_test_executor(inputs, expected, &fetcher).await, res); } } } } - async fn block_input_test_executor

( + async fn block_input_test_executor( inputs: &str, expected: Vec, - provider: &Provider

, - ) -> bool - where - P: JsonRpcClient, - { - let block_chunks = parse_block_inputs(inputs, provider).await.unwrap(); + fetcher: &Fetcher

, + ) -> bool { + let block_chunks = parse_block_inputs(inputs, fetcher).await.unwrap(); assert_eq!(block_chunks.len(), expected.len()); for (i, block_chunk) in block_chunks.iter().enumerate() { let expected_chunk = &expected[i]; @@ -410,19 +397,20 @@ mod tests { async fn block_number_test_helper(tests: Vec<(BlockNumberTest<'_>, bool)>) { let (provider, mock) = Provider::mocked(); + let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None }; for (test, res) in tests { match test { BlockNumberTest::WithMock((block_ref, range_position, expected, latest)) => { mock.push(U64::from(latest)).unwrap(); assert_eq!( - block_number_test_executor(block_ref, range_position, expected, &provider) + block_number_test_executor(block_ref, range_position, expected, &fetcher) .await, res ); } BlockNumberTest::WithoutMock((block_ref, range_position, expected)) => { assert_eq!( - block_number_test_executor(block_ref, range_position, expected, &provider) + block_number_test_executor(block_ref, range_position, expected, &fetcher) .await, res ); @@ -431,16 +419,13 @@ mod tests { } } - async fn block_number_test_executor

( + async fn block_number_test_executor( block_ref: &str, range_position: RangePosition, expected: u64, - provider: &Provider

, - ) -> bool - where - P: JsonRpcClient, - { - let block_number = parse_block_number(block_ref, range_position, provider).await.unwrap(); + fetcher: &Fetcher

, + ) -> bool { + let block_number = parse_block_number(block_ref, range_position, fetcher).await.unwrap(); block_number == expected } diff --git a/crates/cli/src/parse/query.rs b/crates/cli/src/parse/query.rs index a72af106..41a30170 100644 --- a/crates/cli/src/parse/query.rs +++ b/crates/cli/src/parse/query.rs @@ -7,21 +7,22 @@ use ethers::prelude::*; use hex::FromHex; use cryo_freeze::{ - ColumnEncoding, Datatype, FileFormat, LogDecoder, MultiQuery, ParseError, RowFilter, Table, + ColumnEncoding, Datatype, Fetcher, FileFormat, LogDecoder, MultiQuery, ParseError, RowFilter, + Table, }; use super::{blocks, file_output, transactions}; use crate::args::Args; use cryo_freeze::U256Type; -pub(crate) async fn parse_query( +pub(crate) async fn parse_query( args: &Args, - provider: Arc>, + fetcher: Arc>, ) -> Result { let chunks = match (&args.blocks, &args.txs) { - (Some(_), None) => blocks::parse_blocks(args, provider).await?, + (Some(_), None) => blocks::parse_blocks(args, fetcher).await?, (None, Some(txs)) => transactions::parse_transactions(txs)?, - (None, None) => blocks::get_default_block_chunks(args, provider).await?, + (None, None) => blocks::get_default_block_chunks(args, fetcher).await?, (Some(_), Some(_)) => { return Err(ParseError::ParseError("specify only one of --blocks or --txs".to_string())) } diff --git a/crates/cli/src/parse/source.rs b/crates/cli/src/parse/source.rs index fe808c9e..63116e7b 100644 --- a/crates/cli/src/parse/source.rs +++ b/crates/cli/src/parse/source.rs @@ -5,7 +5,7 @@ use governor::{Quota, RateLimiter}; use polars::prelude::*; use std::num::NonZeroU32; -use cryo_freeze::{ParseError, Fetcher, Source}; +use cryo_freeze::{Fetcher, ParseError, Source}; use crate::args::Args; @@ -20,7 +20,7 @@ pub(crate) async fn parse_source(args: &Args) -> Result { Some(rate_limit) => match NonZeroU32::new(rate_limit) { Some(value) => { let quota = Quota::per_second(value); - Some(Arc::new(RateLimiter::direct(quota))) + Some(RateLimiter::direct(quota)) } _ => None, }, @@ -32,13 +32,9 @@ pub(crate) async fn parse_source(args: &Args) -> Result { let max_concurrent_chunks = args.max_concurrent_chunks.unwrap_or(3); let semaphore = tokio::sync::Semaphore::new(max_concurrent_requests as usize); - let semaphore = Some(Arc::new(semaphore)); + let semaphore = Some(semaphore); - let fetcher = Fetcher { - provider, - semaphore, - rate_limiter, - }; + let fetcher = Fetcher { provider, semaphore, rate_limiter }; let output = Source { fetcher: Arc::new(fetcher), chain_id, diff --git a/crates/freeze/src/datasets/blocks_and_transactions.rs b/crates/freeze/src/datasets/blocks_and_transactions.rs index 887b44c4..2391acd7 100644 --- a/crates/freeze/src/datasets/blocks_and_transactions.rs +++ b/crates/freeze/src/datasets/blocks_and_transactions.rs @@ -5,9 +5,12 @@ use polars::prelude::*; use tokio::{sync::mpsc, task}; use super::blocks; -use crate::types::{ - BlockChunk, BlocksAndTransactions, CollectError, Datatype, MultiDataset, RowFilter, Source, - Table, +use crate::{ + sources::Fetcher, + types::{ + BlockChunk, BlocksAndTransactions, CollectError, Datatype, MultiDataset, RowFilter, Source, + Table, + }, }; #[async_trait::async_trait] @@ -62,26 +65,15 @@ pub(crate) async fn fetch_blocks_and_transactions( for number in block_chunk.numbers() { let tx = tx.clone(); - let provider = source.provider.clone(); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); - let source_arc = source.clone(); + let fetcher = source.fetcher.clone(); task::spawn(async move { - let permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - }; - let block_result = provider.get_block_with_txs(number).await; - drop(permit); + let block_result = fetcher.get_block_with_txs(number).await; // get gas usage let result = match block_result { Ok(Some(block)) => { if include_gas_used { - match get_txs_gas_used(&block, source_arc.clone()).await { + match get_txs_gas_used(&block, fetcher).await { Ok(gas_used) => Ok((block, Some(gas_used))), Err(e) => Err(e), } @@ -90,7 +82,7 @@ pub(crate) async fn fetch_blocks_and_transactions( } } Ok(None) => Err(CollectError::CollectError("no block found".into())), - Err(e) => Err(CollectError::ProviderError(e)), + Err(e) => Err(e), }; // send to channel @@ -106,66 +98,47 @@ pub(crate) async fn fetch_blocks_and_transactions( rx } -async fn get_txs_gas_used( +async fn get_txs_gas_used( block: &Block, - source: Arc, + fetcher: Arc>, ) -> Result, CollectError> { - match get_txs_gas_used_per_block(block, source.clone()).await { + match get_txs_gas_used_per_block(block, fetcher.clone()).await { Ok(value) => Ok(value), - Err(_) => get_txs_gas_used_per_tx(block, source).await, + Err(_) => get_txs_gas_used_per_tx(block, fetcher).await, } } -async fn get_txs_gas_used_per_block( +async fn get_txs_gas_used_per_block( block: &Block, - source: Arc, + fetcher: Arc>, ) -> Result, CollectError> { let block_number = match block.number { Some(number) => number, None => return Err(CollectError::CollectError("no block number".to_string())), }; - match source.provider.get_block_receipts(block_number).await { - Ok(receipts) => { - let mut gas_used: Vec = Vec::new(); - for receipt in receipts { - match receipt.gas_used { - Some(value) => gas_used.push(value.as_u32()), - None => { - return Err(CollectError::CollectError("no gas_used for tx".to_string())) - } - } - } - Ok(gas_used) + let receipts = fetcher.get_block_receipts(block_number.as_u64()).await?; + let mut gas_used: Vec = Vec::new(); + for receipt in receipts { + match receipt.gas_used { + Some(value) => gas_used.push(value.as_u32()), + None => return Err(CollectError::CollectError("no gas_used for tx".to_string())), } - Err(_) => Err(CollectError::CollectError("error in eth_getBlockReceipts".to_string())), } + Ok(gas_used) } -async fn get_txs_gas_used_per_tx( +async fn get_txs_gas_used_per_tx( block: &Block, - source: Arc, + fetcher: Arc>, ) -> Result, CollectError> { - let source = Arc::new(source); let mut tasks = Vec::new(); for tx in &block.transactions { - let provider = source.provider.clone(); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); let tx_clone = tx.hash; + let fetcher = fetcher.clone(); let task = task::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - }; - match provider.get_transaction_receipt(tx_clone).await { - Ok(Some(receipt)) => Ok(receipt.gas_used), - Ok(None) => { - Err(CollectError::CollectError("could not find tx receipt".to_string())) - } - Err(e) => Err(CollectError::ProviderError(e)), + match fetcher.get_transaction_receipt(tx_clone).await? { + Some(receipt) => Ok(receipt.gas_used), + None => Err(CollectError::CollectError("could not find tx receipt".to_string())), } }); tasks.push(task); diff --git a/crates/freeze/src/datasets/contracts.rs b/crates/freeze/src/datasets/contracts.rs index 650a9e28..24ef274e 100644 --- a/crates/freeze/src/datasets/contracts.rs +++ b/crates/freeze/src/datasets/contracts.rs @@ -65,7 +65,7 @@ impl Dataset for Contracts { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = traces::fetch_block_traces(chunk, source).await; + let rx = traces::fetch_block_traces(chunk, source); traces_to_contracts_df(rx, schema, source.chain_id).await } @@ -76,7 +76,7 @@ impl Dataset for Contracts { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = traces::fetch_transaction_traces(chunk, source).await; + let rx = traces::fetch_transaction_traces(chunk, source); traces_to_contracts_df(rx, schema, source.chain_id).await } } diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index a02d8b3d..d7415e89 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::collections::{HashMap, HashSet}; use ethers::prelude::*; use ethers_core::abi::{AbiEncode, EventParam, HumanReadableParser, ParamType, RawLog, Token}; @@ -69,7 +66,7 @@ impl Dataset for Logs { schema: &Table, filter: Option<&RowFilter>, ) -> Result { - let rx = fetch_block_logs(chunk, source, filter).await; + let rx = fetch_block_logs(chunk, source, filter); logs_to_df(rx, schema, source.chain_id).await } @@ -90,7 +87,7 @@ impl Dataset for Logs { } } -async fn fetch_block_logs( +fn fetch_block_logs( block_chunk: &BlockChunk, source: &Source, filter: Option<&RowFilter>, @@ -100,9 +97,7 @@ async fn fetch_block_logs( let (tx, rx) = mpsc::channel(request_chunks.len()); for request_chunk in request_chunks.iter() { let tx = tx.clone(); - let provider = source.provider.clone(); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); let log_filter = match filter { Some(filter) => Filter { block_option: *request_chunk, @@ -116,14 +111,7 @@ async fn fetch_block_logs( }, }; task::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - } - let result = provider.get_logs(&log_filter).await.map_err(CollectError::ProviderError); + let result = fetcher.get_logs(&log_filter).await; match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { @@ -147,21 +135,9 @@ async fn fetch_transaction_logs( for tx_hash in tx_hashes.iter() { let tx_hash = tx_hash.clone(); let tx = tx.clone(); - let provider = source.provider.clone(); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); task::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - } - let receipt = provider - .get_transaction_receipt(H256::from_slice(&tx_hash)) - .await - .map_err(CollectError::ProviderError); + let receipt = fetcher.get_transaction_receipt(H256::from_slice(&tx_hash)).await; let logs = match receipt { Ok(Some(receipt)) => Ok(receipt.logs), _ => Err(CollectError::CollectError("".to_string())), diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs index 94115477..a66b0742 100644 --- a/crates/freeze/src/datasets/native_transfers.rs +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -59,7 +59,7 @@ impl Dataset for NativeTransfers { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = traces::fetch_block_traces(chunk, source).await; + let rx = traces::fetch_block_traces(chunk, source); traces_to_native_transfers_df(rx, schema, source.chain_id).await } @@ -70,7 +70,7 @@ impl Dataset for NativeTransfers { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = traces::fetch_transaction_traces(chunk, source).await; + let rx = traces::fetch_transaction_traces(chunk, source); traces_to_native_transfers_df(rx, schema, source.chain_id).await } } diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index b7b33786..de37930e 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use ethers::prelude::*; use polars::prelude::*; @@ -85,7 +85,7 @@ impl Dataset for Traces { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = fetch_block_traces(chunk, source).await; + let rx = fetch_block_traces(chunk, source); traces_to_df(rx, schema, source.chain_id).await } @@ -96,12 +96,12 @@ impl Dataset for Traces { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = fetch_transaction_traces(chunk, source).await; + let rx = fetch_transaction_traces(chunk, source); traces_to_df(rx, schema, source.chain_id).await } } -pub(crate) async fn fetch_block_traces( +pub(crate) fn fetch_block_traces( block_chunk: &BlockChunk, source: &Source, ) -> mpsc::Receiver, CollectError>> { @@ -109,21 +109,9 @@ pub(crate) async fn fetch_block_traces( for number in block_chunk.numbers() { let tx = tx.clone(); - let provider = source.provider.clone(); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); task::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - } - let result = provider - .trace_block(BlockNumber::Number(number.into())) - .await - .map_err(CollectError::ProviderError); + let result = fetcher.trace_block(BlockNumber::Number(number.into())).await; match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { @@ -136,7 +124,7 @@ pub(crate) async fn fetch_block_traces( rx } -pub(crate) async fn fetch_transaction_traces( +pub(crate) fn fetch_transaction_traces( transaction_chunk: &TransactionChunk, source: &Source, ) -> mpsc::Receiver, CollectError>> { @@ -146,21 +134,9 @@ pub(crate) async fn fetch_transaction_traces( for tx_hash in tx_hashes.iter() { let tx_hash = tx_hash.clone(); let tx = tx.clone(); - let provider = source.provider.clone(); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); task::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - } - let result = provider - .trace_transaction(H256::from_slice(&tx_hash)) - .await - .map_err(CollectError::ProviderError); + let result = fetcher.trace_transaction(H256::from_slice(&tx_hash)).await; match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { @@ -177,7 +153,7 @@ pub(crate) async fn fetch_transaction_traces( let result = Err(CollectError::CollectError( "transaction value ranges not supported".to_string(), )); - match tx.send(result).await { + match tx.blocking_send(result) { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); diff --git a/crates/freeze/src/datasets/transactions.rs b/crates/freeze/src/datasets/transactions.rs index 1dacf7f4..e4ddcbf5 100644 --- a/crates/freeze/src/datasets/transactions.rs +++ b/crates/freeze/src/datasets/transactions.rs @@ -111,25 +111,16 @@ async fn fetch_transactions( TransactionChunk::Values(tx_hashes) => { for tx_hash in tx_hashes.iter() { let tx = tx.clone(); - let provider = Arc::clone(&source.provider); - let semaphore = source.semaphore.clone(); - let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let fetcher = source.fetcher.clone(); let tx_hash = tx_hash.clone(); task::spawn(async move { - let _permit = match semaphore { - Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), - _ => None, - }; - if let Some(limiter) = rate_limiter { - Arc::clone(&limiter).until_ready().await; - } let tx_hash = H256::from_slice(&tx_hash); - let transaction = provider.get_transaction(tx_hash).await; + let transaction = fetcher.get_transaction(tx_hash).await; // get gas_used using receipt let gas_used = if include_gas_used { - match provider.get_transaction_receipt(tx_hash).await { - Ok(Some(receipt)) => match receipt.gas_used { + match fetcher.get_transaction_receipt(tx_hash).await? { + Some(receipt) => match receipt.gas_used { Some(gas_used) => Some(gas_used.as_u32()), None => { return Err(CollectError::CollectError( @@ -148,12 +139,11 @@ async fn fetch_transactions( }; // package result - let result = match transaction { - Ok(Some(transaction)) => Ok((transaction, gas_used)), - Ok(None) => { + let result = match transaction? { + Some(transaction) => Ok((transaction, gas_used)), + None => { Err(CollectError::CollectError("transaction not in node".to_string())) } - Err(e) => Err(CollectError::ProviderError(e)), }; // send to channel diff --git a/crates/freeze/src/types/mod.rs b/crates/freeze/src/types/mod.rs index a1dfcc82..b770d4ba 100644 --- a/crates/freeze/src/types/mod.rs +++ b/crates/freeze/src/types/mod.rs @@ -30,7 +30,7 @@ pub use datatypes::*; pub use files::{ColumnEncoding, FileFormat, FileOutput}; pub use queries::{MultiQuery, RowFilter, SingleQuery}; pub use schemas::{ColumnType, Table, U256Type}; -pub use sources::{RateLimiter, Source}; +pub use sources::{Fetcher, RateLimiter, Source}; pub(crate) use summaries::FreezeSummaryAgg; pub use summaries::{FreezeChunkSummary, FreezeSummary}; diff --git a/crates/freeze/src/types/sources.rs b/crates/freeze/src/types/sources.rs index a221b6eb..71b26c05 100644 --- a/crates/freeze/src/types/sources.rs +++ b/crates/freeze/src/types/sources.rs @@ -16,7 +16,8 @@ pub type RateLimiter = governor::RateLimiter, + /// Shared provider for rpc data + pub fetcher: Arc>, /// chain_id of network pub chain_id: u64, /// number of blocks per log request @@ -25,9 +26,10 @@ pub struct Source { pub max_concurrent_chunks: u64, } -pub struct Fetcher { +/// Wrapper over `Provider

` that adds concurrency and rate limiting controls +pub struct Fetcher

{ /// provider data source - pub provider: Provider, + pub provider: Provider

, /// semaphore for controlling concurrency pub semaphore: Option, /// rate limiter for controlling request rate @@ -36,56 +38,98 @@ pub struct Fetcher { type Result = ::core::result::Result; -impl Fetcher { - async fn permit_request( - &self, - ) -> Option<::core::result::Result, AcquireError>> { - let permit = match &self.semaphore { - Some(semaphore) => Some(semaphore.acquire().await), - _ => None, - }; - if let Some(limiter) = &self.rate_limiter { - limiter.until_ready().await; - } - permit - } - +impl Fetcher

{ + /// Returns an array (possibly empty) of logs that match the filter pub async fn get_logs(&self, filter: &Filter) -> Result> { let _permit = self.permit_request().await; - self.provider.get_logs(filter).await.map_err(CollectError::ProviderError) + Self::map_err(self.provider.get_logs(filter).await) } + /// Replays all transactions in a block returning the requested traces for each transaction pub async fn trace_replay_block_transactions( &self, block: BlockNumber, trace_types: Vec, ) -> Result> { let _permit = self.permit_request().await; - self.provider - .trace_replay_block_transactions(block, trace_types) - .await - .map_err(CollectError::ProviderError) + Self::map_err(self.provider.trace_replay_block_transactions(block, trace_types).await) } + /// Replays a transaction, returning the traces pub async fn trace_replay_transaction( &self, block: H256, trace_types: Vec, ) -> Result { let _permit = self.permit_request().await; - self.provider - .trace_replay_transaction(block, trace_types) - .await - .map_err(CollectError::ProviderError) + Self::map_err(self.provider.trace_replay_transaction(block, trace_types).await) + } + + /// Gets the transaction with transaction_hash + pub async fn get_transaction(&self, tx_hash: TxHash) -> Result> { + let _permit = self.permit_request().await; + Self::map_err(self.provider.get_transaction(tx_hash).await) } - pub async fn get_transaction(&self, tx_hash: H256) -> Result> { + /// Gets the transaction receipt with transaction_hash + pub async fn get_transaction_receipt( + &self, + tx_hash: TxHash, + ) -> Result> { let _permit = self.permit_request().await; - self.provider.get_transaction(tx_hash).await.map_err(CollectError::ProviderError) + Self::map_err(self.provider.get_transaction_receipt(tx_hash).await) } + /// Gets the block at `block_num` (transaction hashes only) pub async fn get_block(&self, block_num: u64) -> Result>> { - self.provider.get_block(block_num).await.map_err(CollectError::ProviderError) + let _permit = self.permit_request().await; + Self::map_err(self.provider.get_block(block_num).await) + } + + /// Gets the block at `block_num` (full transactions included) + pub async fn get_block_with_txs(&self, block_num: u64) -> Result>> { + let _permit = self.permit_request().await; + Self::map_err(self.provider.get_block_with_txs(block_num).await) + } + + /// Returns all receipts for a block. + pub async fn get_block_receipts(&self, block_num: u64) -> Result> { + let _permit = self.permit_request().await; + Self::map_err(self.provider.get_block_receipts(block_num).await) + } + + /// Returns traces created at given block + pub async fn trace_block(&self, block_num: BlockNumber) -> Result> { + let _permit = self.permit_request().await; + Self::map_err(self.provider.trace_block(block_num).await) + } + + /// Returns all traces of a given transaction + pub async fn trace_transaction(&self, tx_hash: TxHash) -> Result> { + let _permit = self.permit_request().await; + self.provider.trace_transaction(tx_hash).await.map_err(CollectError::ProviderError) + } + + /// Get the block number + pub async fn get_block_number(&self) -> Result { + Self::map_err(self.provider.get_block_number().await) + } + + async fn permit_request( + &self, + ) -> Option<::core::result::Result, AcquireError>> { + let permit = match &self.semaphore { + Some(semaphore) => Some(semaphore.acquire().await), + _ => None, + }; + if let Some(limiter) = &self.rate_limiter { + limiter.until_ready().await; + } + permit + } + + fn map_err(res: ::core::result::Result) -> Result { + res.map_err(CollectError::ProviderError) } } From c5ab5fb1fd4c1feea84f259bfc8f6c89f94b4268 Mon Sep 17 00:00:00 2001 From: Kamil Skalski Date: Tue, 5 Sep 2023 12:11:48 +0200 Subject: [PATCH 3/3] Fix parameter name. --- crates/freeze/src/types/sources.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/freeze/src/types/sources.rs b/crates/freeze/src/types/sources.rs index 71b26c05..015e494d 100644 --- a/crates/freeze/src/types/sources.rs +++ b/crates/freeze/src/types/sources.rs @@ -58,11 +58,11 @@ impl Fetcher

{ /// Replays a transaction, returning the traces pub async fn trace_replay_transaction( &self, - block: H256, + tx_hash: TxHash, trace_types: Vec, ) -> Result { let _permit = self.permit_request().await; - Self::map_err(self.provider.trace_replay_transaction(block, trace_types).await) + Self::map_err(self.provider.trace_replay_transaction(tx_hash, trace_types).await) } /// Gets the transaction with transaction_hash