From e734f0622105b4bebd21aaebe78bdb7a772aa3d2 Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Sun, 6 Aug 2023 01:31:25 -0700 Subject: [PATCH] implement --txs for logs --- crates/cli/src/args.rs | 24 +-- crates/cli/src/parse/blocks.rs | 31 +++- crates/cli/src/parse/mod.rs | 1 + crates/cli/src/parse/query.rs | 11 +- crates/cli/src/parse/transactions.rs | 66 ++++++++ crates/cli/src/summaries.rs | 158 +++++++++++++----- crates/freeze/src/datasets/logs.rs | 81 ++++++++- .../freeze/src/types/chunks/binary_chunk.rs | 6 +- crates/python/src/collect_adapter.rs | 7 +- crates/python/src/freeze_adapter.rs | 7 +- 10 files changed, 326 insertions(+), 66 deletions(-) create mode 100644 crates/cli/src/parse/transactions.rs diff --git a/crates/cli/src/args.rs b/crates/cli/src/args.rs index 5ff9726e..de1f7583 100644 --- a/crates/cli/src/args.rs +++ b/crates/cli/src/args.rs @@ -10,15 +10,17 @@ pub struct Args { pub datatype: Vec, /// Block numbers, see syntax below + #[arg(short, long, allow_hyphen_values(true), help_heading = "Content Options")] + pub blocks: Option>, + + /// Transaction hashes, see syntax below #[arg( short, long, - default_value = "0:latest", - allow_hyphen_values(true), help_heading = "Content Options", - num_args = 1.. + num_args(1..), )] - pub blocks: Vec, + pub txs: Option>, /// Align block chunk boundaries to regular intervals, /// e.g. (1000, 2000, 3000) instead of (1106, 2106, 3106) @@ -36,14 +38,6 @@ pub struct Args { )] pub reorg_buffer: u64, - // #[arg( - // short, - // long, - // allow_hyphen_values(true), - // help_heading = "Content Options", - // help = "Select by data transaction instead of by block,\ncan be a list or a file, see - // syntax below", )] - // pub txs: Vec, /// Columns to include alongside the default output, /// use `all` to include all available columns #[arg(short, long, value_name="COLS", num_args(0..), verbatim_doc_comment, help_heading="Content Options")] @@ -203,6 +197,12 @@ fn get_after_str() -> &'static str { - omitting range start means 0 :700 == 0:700 - minus on start means minus end -1000:7000 == 6000:7000 - plus sign on end means plus start 15M:+1000 == 15M:15.001K + +Transaction hash specification syntax +- can use transaction hashes --txs TX_HASH1 TX_HASH2 TX_HASH3 +- can use a parquet file --txs ./path/to/file.parquet[:COLUMN_NAME] + (default column name is transaction_hash) +- can use multiple parquet files --txs ./path/to/ethereum__logs*.parquet "# ) } diff --git a/crates/cli/src/parse/blocks.rs b/crates/cli/src/parse/blocks.rs index 872cd65a..4df4daf6 100644 --- a/crates/cli/src/parse/blocks.rs +++ b/crates/cli/src/parse/blocks.rs @@ -10,21 +10,50 @@ pub(crate) async fn parse_blocks( args: &Args, provider: Arc>, ) -> Result, ParseError> { - let block_chunks = parse_block_inputs(&args.blocks, &provider).await?; + // parse inputs into BlockChunks + let block_chunks = match &args.blocks { + Some(inputs) => parse_block_inputs(inputs, &provider).await?, + None => return Err(ParseError::ParseError("could not parse block inputs".to_string())), + }; + + postprocess_block_chunks(block_chunks, args, provider).await +} + +async fn postprocess_block_chunks( + block_chunks: Vec, + args: &Args, + provider: Arc>, +) -> Result, ParseError> { + // align let block_chunks = if args.align { block_chunks.into_iter().filter_map(|x| x.align(args.chunk_size)).collect() } else { block_chunks }; + + // split block range into chunks let block_chunks = match args.n_chunks { Some(n_chunks) => block_chunks.subchunk_by_count(&n_chunks), None => block_chunks.subchunk_by_size(&args.chunk_size), }; + + // apply reorg buffer let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &provider).await?; + + // put into Chunk enums let chunks: Vec = block_chunks.iter().map(|x| Chunk::Block(x.clone())).collect(); + Ok(chunks) } +pub(crate) async fn get_default_block_chunks( + args: &Args, + provider: Arc>, +) -> Result, ParseError> { + let block_chunks = parse_block_inputs(&vec!["0:latest".to_string()], &provider).await?; + postprocess_block_chunks(block_chunks, args, provider).await +} + /// parse block numbers to freeze async fn parse_block_inputs( inputs: &Vec, diff --git a/crates/cli/src/parse/mod.rs b/crates/cli/src/parse/mod.rs index 6b814dd2..18e2fed0 100644 --- a/crates/cli/src/parse/mod.rs +++ b/crates/cli/src/parse/mod.rs @@ -3,6 +3,7 @@ mod blocks; mod file_output; mod query; mod source; +mod transactions; pub use args::*; // use blocks::*; diff --git a/crates/cli/src/parse/query.rs b/crates/cli/src/parse/query.rs index 47d671ef..69c09dfa 100644 --- a/crates/cli/src/parse/query.rs +++ b/crates/cli/src/parse/query.rs @@ -5,14 +5,21 @@ use hex::FromHex; use cryo_freeze::{ColumnEncoding, Datatype, FileFormat, MultiQuery, ParseError, RowFilter, Table}; -use super::{blocks, file_output}; +use super::{blocks, file_output, transactions}; use crate::args::Args; pub(crate) async fn parse_query( args: &Args, provider: Arc>, ) -> Result { - let chunks = blocks::parse_blocks(args, provider).await?; + let chunks = match (&args.blocks, &args.txs) { + (Some(_), None) => blocks::parse_blocks(args, provider).await?, + (None, Some(txs)) => transactions::parse_transactions(txs)?, + (None, None) => blocks::get_default_block_chunks(args, provider).await?, + (Some(_), Some(_)) => { + return Err(ParseError::ParseError("specify only one of --blocks or --txs".to_string())) + } + }; // process schemas let schemas = parse_schemas(args)?; diff --git a/crates/cli/src/parse/transactions.rs b/crates/cli/src/parse/transactions.rs new file mode 100644 index 00000000..492df21d --- /dev/null +++ b/crates/cli/src/parse/transactions.rs @@ -0,0 +1,66 @@ +use cryo_freeze::{Chunk, ParseError, TransactionChunk}; +use polars::prelude::*; + +pub(crate) fn parse_transactions(txs: &[String]) -> Result, ParseError> { + let (files, hashes): (Vec<&String>, Vec<&String>) = + txs.iter().partition(|tx| std::path::Path::new(tx).exists()); + + let mut file_chunks = if !files.is_empty() { + let mut file_chunks = Vec::new(); + for path in files { + let column = if path.contains(':') { + path.split(':') + .last() + .ok_or(ParseError::ParseError("could not parse txs path column".to_string()))? + } else { + "transaction_hash" + }; + let tx_hashes = read_binary_column(path, column) + .map_err(|_e| ParseError::ParseError("could not read input".to_string()))?; + let chunk = TransactionChunk::Values(tx_hashes); + file_chunks.push(Chunk::Transaction(chunk)); + } + file_chunks + } else { + Vec::new() + }; + + let hash_chunks = if !hashes.is_empty() { + let values: Result>, _> = hashes.iter().map(hex::decode).collect(); + let values = + values.map_err(|_e| ParseError::ParseError("could not parse txs".to_string()))?; + let chunk = Chunk::Transaction(TransactionChunk::Values(values)); + vec![chunk] + } else { + Vec::new() + }; + + file_chunks.extend(hash_chunks); + Ok(file_chunks) +} + +fn read_binary_column(path: &str, column: &str) -> Result>, ParseError> { + let file = std::fs::File::open(path) + .map_err(|_e| ParseError::ParseError("could not open file path".to_string()))?; + + let df = ParquetReader::new(file) + .with_columns(Some(vec![column.to_string()])) + .finish() + .map_err(|_e| ParseError::ParseError("could not read data from column".to_string()))?; + + let series = df + .column(column) + .map_err(|_e| ParseError::ParseError("could not get column".to_string()))?; + + let ca = series + .binary() + .map_err(|_e| ParseError::ParseError("could not convert to binary column".to_string()))?; + + ca.into_iter() + .map(|value| { + value + .ok_or_else(|| ParseError::ParseError("transaction hash missing".to_string())) + .map(|data| data.into()) + }) + .collect() +} diff --git a/crates/cli/src/summaries.rs b/crates/cli/src/summaries.rs index 902f0392..39a812d5 100644 --- a/crates/cli/src/summaries.rs +++ b/crates/cli/src/summaries.rs @@ -7,6 +7,7 @@ use thousands::Separable; use cryo_freeze::{ BlockChunk, Chunk, ChunkData, Datatype, FileOutput, FreezeSummary, MultiQuery, Source, Table, + TransactionChunk, }; const TITLE_R: u8 = 0; @@ -35,15 +36,8 @@ pub(crate) fn print_cryo_summary(query: &MultiQuery, source: &Source, sink: &Fil print_bullet("network", &sink.prefix); // let rpc_url = cli::parse_rpc_url(args); // print_bullet("provider", rpc_url); - let block_chunks = query - .chunks - .iter() - .filter_map(|x| match x.clone() { - Chunk::Block(chunk) => Some(chunk), - _ => None, - }) - .collect(); - print_block_chunks(block_chunks); + print_block_chunks(query); + print_transaction_chunks(query); print_bullet("max concurrent chunks", source.max_concurrent_chunks.separate_with_commas()); if query.schemas.contains_key(&Datatype::Logs) { print_bullet("inner request size", source.inner_request_size.to_string()); @@ -53,20 +47,51 @@ pub(crate) fn print_cryo_summary(query: &MultiQuery, source: &Source, sink: &Fil print_schemas(&query.schemas); } -fn print_block_chunks(chunks: Vec) { - if let Some(min) = chunks.min_value() { - print_bullet("min block", min.separate_with_commas()); - }; - if let Some(max) = chunks.max_value() { - print_bullet("max block", max.separate_with_commas()); - }; - print_bullet("total blocks", chunks.size().separate_with_commas()); +fn print_block_chunks(query: &MultiQuery) { + let block_chunks: Vec = query + .chunks + .iter() + .filter_map(|x| match x.clone() { + Chunk::Block(chunk) => Some(chunk), + _ => None, + }) + .collect(); + if !block_chunks.is_empty() { + if let Some(min) = block_chunks.min_value() { + print_bullet("min block", min.separate_with_commas()); + }; + if let Some(max) = block_chunks.max_value() { + print_bullet("max block", max.separate_with_commas()); + }; + print_bullet("total blocks", block_chunks.size().separate_with_commas()); + if let Some(first_chunk) = block_chunks.get(0) { + let chunk_size = first_chunk.size(); + print_bullet("block chunk size", chunk_size.separate_with_commas()); + }; + print_bullet("total block chunks", block_chunks.len().separate_with_commas()); + } +} - if let Some(first_chunk) = chunks.get(0) { - let chunk_size = first_chunk.size(); - print_bullet("block chunk size", chunk_size.separate_with_commas()); - }; - print_bullet("total block chunks", chunks.len().separate_with_commas()); +fn print_transaction_chunks(query: &MultiQuery) { + let transaction_chunks: Vec = query + .chunks + .iter() + .filter_map(|x| match x.clone() { + Chunk::Transaction(chunk) => Some(chunk), + _ => None, + }) + .collect(); + if !transaction_chunks.is_empty() { + let total_transactions = transaction_chunks + .iter() + .map(|chunk| match chunk { + TransactionChunk::Values(values) => values.len() as u64, + TransactionChunk::Range(_, _) => todo!(), + }) + .sum::(); + print_bullet("total transactions", total_transactions.to_string()); + print_bullet("total transaction chunks", transaction_chunks.len().separate_with_commas()); + } } fn print_schemas(schemas: &HashMap) { @@ -120,6 +145,7 @@ pub(crate) fn print_cryo_conclusion( }; let seconds = duration.as_secs(); let millis = duration.subsec_millis(); + let total_time = (seconds as f64) + (duration.subsec_nanos() as f64) / 1e9; let duration_string = format!("{}.{:03} seconds", seconds, millis); print_header("collection summary"); @@ -129,6 +155,18 @@ pub(crate) fn print_cryo_conclusion( "t_end", " ".to_string() + dt_data_done.format("%Y-%m-%d %H:%M:%S%.3f").to_string().as_str(), ); + print_bullet("chunks errored", freeze_summary.n_errored.separate_with_commas()); + print_bullet("chunks skipped", freeze_summary.n_skipped.separate_with_commas()); + print_bullet( + "chunks collected", + format!("{} / {}", freeze_summary.n_completed.separate_with_commas(), query.chunks.len()), + ); + + print_block_chunk_summary(query, freeze_summary, total_time); + print_transaction_chunk_summary(query, freeze_summary, total_time); +} + +fn print_block_chunk_summary(query: &MultiQuery, freeze_summary: &FreezeSummary, total_time: f64) { let block_chunks: Vec = query .chunks .iter() @@ -137,26 +175,62 @@ pub(crate) fn print_cryo_conclusion( _ => None, }) .collect(); - let n_chunks = block_chunks.len(); - print_bullet("chunks errored", freeze_summary.n_errored.separate_with_commas()); - print_bullet("chunks skipped", freeze_summary.n_skipped.separate_with_commas()); - print_bullet( - "chunks collected", - format!("{} / {}", freeze_summary.n_completed.separate_with_commas(), n_chunks), - ); - let total_blocks = block_chunks.size() as f64; - let blocks_completed = - total_blocks * (freeze_summary.n_completed as f64 / block_chunks.len() as f64); - print_bullet("blocks collected", blocks_completed.separate_with_commas()); - let total_time = (seconds as f64) + (duration.subsec_nanos() as f64) / 1e9; - let blocks_per_second = blocks_completed / total_time; - let blocks_per_minute = blocks_per_second * 60.0; - let blocks_per_hour = blocks_per_minute * 60.0; - let blocks_per_day = blocks_per_hour * 24.0; - print_bullet("blocks per second", format_float(blocks_per_second)); - print_bullet("blocks per minute", format_float(blocks_per_minute)); - print_bullet("blocks per hour", " ".to_string() + format_float(blocks_per_hour).as_str()); - print_bullet("blocks per day", " ".to_string() + format_float(blocks_per_day).as_str()); + if !block_chunks.is_empty() { + let total_blocks = block_chunks.size() as f64; + let blocks_completed = + total_blocks * (freeze_summary.n_completed as f64 / block_chunks.len() as f64); + print_bullet("blocks collected", blocks_completed.separate_with_commas()); + let blocks_per_second = blocks_completed / total_time; + let blocks_per_minute = blocks_per_second * 60.0; + let blocks_per_hour = blocks_per_minute * 60.0; + let blocks_per_day = blocks_per_hour * 24.0; + print_bullet("blocks per second", format_float(blocks_per_second)); + print_bullet("blocks per minute", format_float(blocks_per_minute)); + print_bullet("blocks per hour", " ".to_string() + format_float(blocks_per_hour).as_str()); + print_bullet("blocks per day", " ".to_string() + format_float(blocks_per_day).as_str()); + }; +} + +fn print_transaction_chunk_summary( + query: &MultiQuery, + freeze_summary: &FreezeSummary, + total_time: f64, +) { + let transaction_chunks: Vec = query + .chunks + .iter() + .filter_map(|x| match x { + Chunk::Transaction(chunk) => Some(chunk.clone()), + _ => None, + }) + .collect(); + if !transaction_chunks.is_empty() { + // let total_transactions = transaction_chunks.size() as f64; + let total_transactions = transaction_chunks + .iter() + .map(|chunk| match chunk { + TransactionChunk::Values(values) => values.len() as u64, + TransactionChunk::Range(_, _) => todo!(), + }) + .sum::(); + let transactions_completed: u64 = + total_transactions * (freeze_summary.n_completed / transaction_chunks.len() as u64); + print_bullet("transactions collected", transactions_completed.separate_with_commas()); + let transactions_per_second = transactions_completed as f64 / total_time; + let transactions_per_minute = transactions_per_second * 60.0; + let transactions_per_hour = transactions_per_minute * 60.0; + let transactions_per_day = transactions_per_hour * 24.0; + print_bullet("transactions per second", format_float(transactions_per_second)); + print_bullet("transactions per minute", format_float(transactions_per_minute)); + print_bullet( + "transactions per hour", + " ".to_string() + format_float(transactions_per_hour).as_str(), + ); + print_bullet( + "transactions per day", + " ".to_string() + format_float(transactions_per_day).as_str(), + ); + }; } fn format_float(number: f64) -> String { diff --git a/crates/freeze/src/datasets/logs.rs b/crates/freeze/src/datasets/logs.rs index 8d263bc0..7c135e28 100644 --- a/crates/freeze/src/datasets/logs.rs +++ b/crates/freeze/src/datasets/logs.rs @@ -8,7 +8,7 @@ use crate::{ dataframes::SortableDataFrame, types::{ conversions::ToVecHex, BlockChunk, CollectError, ColumnType, Dataset, Datatype, Logs, - RowFilter, Source, Table, + RowFilter, Source, Table, TransactionChunk, }, with_series, with_series_binary, }; @@ -65,16 +65,33 @@ impl Dataset for Logs { schema: &Table, filter: Option<&RowFilter>, ) -> Result { - let rx = fetch_logs(chunk, source, filter).await; + let rx = fetch_block_logs(chunk, source, filter).await; + logs_to_df(rx, schema, source.chain_id).await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + filter: Option<&RowFilter>, + ) -> Result { + // if let Some(_filter) = filter { + // return Err(CollectError::CollectError( + // "filters not supported when using --txs".to_string(), + // )); + // }; + let rx = fetch_transaction_logs(chunk, source, filter).await; logs_to_df(rx, schema, source.chain_id).await } } -async fn fetch_logs( +async fn fetch_block_logs( block_chunk: &BlockChunk, source: &Source, filter: Option<&RowFilter>, ) -> mpsc::Receiver, CollectError>> { + // todo: need to modify these functions so they turn a result let request_chunks = block_chunk.to_log_filter_options(&source.inner_request_size); let (tx, rx) = mpsc::channel(request_chunks.len()); for request_chunk in request_chunks.iter() { @@ -115,6 +132,64 @@ async fn fetch_logs( rx } +async fn fetch_transaction_logs( + transaction_chunk: &TransactionChunk, + source: &Source, + _filter: Option<&RowFilter>, +) -> mpsc::Receiver, CollectError>> { + match transaction_chunk { + TransactionChunk::Values(tx_hashes) => { + let (tx, rx) = mpsc::channel(tx_hashes.len() * 200); + for tx_hash in tx_hashes.iter() { + let tx_hash = tx_hash.clone(); + let tx = tx.clone(); + let provider = source.provider.clone(); + let semaphore = source.semaphore.clone(); + let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + task::spawn(async move { + let _permit = match semaphore { + Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), + _ => None, + }; + if let Some(limiter) = rate_limiter { + Arc::clone(&limiter).until_ready().await; + } + let receipt = provider + .get_transaction_receipt(H256::from_slice(&tx_hash)) + .await + .map_err(CollectError::ProviderError); + let logs = match receipt { + Ok(Some(receipt)) => Ok(receipt.logs), + _ => Err(CollectError::CollectError("".to_string())), + }; + match tx.send(logs).await { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } + } + }); + } + rx + } + _ => { + let (tx, rx) = mpsc::channel(1); + let result = Err(CollectError::CollectError( + "transaction value ranges not supported".to_string(), + )); + match tx.send(result).await { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } + } + rx + } + } +} + async fn logs_to_df( mut logs: mpsc::Receiver, CollectError>>, schema: &Table, diff --git a/crates/freeze/src/types/chunks/binary_chunk.rs b/crates/freeze/src/types/chunks/binary_chunk.rs index 708646d3..71ed3775 100644 --- a/crates/freeze/src/types/chunks/binary_chunk.rs +++ b/crates/freeze/src/types/chunks/binary_chunk.rs @@ -13,8 +13,10 @@ pub enum BinaryChunk { impl ChunkData for BinaryChunk { type Inner = Vec; - fn format_item(_value: Self::Inner) -> String { - todo!() + fn format_item(value: Self::Inner) -> String { + let hash = prefix_hex::encode(value); + let start = &hash[..hash.char_indices().nth(8).unwrap().0]; + start.to_string() } fn size(&self) -> u64 { diff --git a/crates/python/src/collect_adapter.rs b/crates/python/src/collect_adapter.rs index 8a8ab9f8..35ae5729 100644 --- a/crates/python/src/collect_adapter.rs +++ b/crates/python/src/collect_adapter.rs @@ -8,8 +8,9 @@ use cryo_freeze::collect; #[pyfunction( signature = ( datatype, - blocks, + blocks = None, *, + txs = None, align = false, reorg_buffer = 0, include_columns = None, @@ -47,7 +48,8 @@ use cryo_freeze::collect; pub fn _collect( py: Python<'_>, datatype: String, - blocks: Vec, + blocks: Option>, + txs: Option>, align: bool, reorg_buffer: u64, include_columns: Option>, @@ -83,6 +85,7 @@ pub fn _collect( let args = Args { datatype: vec![datatype], blocks, + txs, align, reorg_buffer, include_columns, diff --git a/crates/python/src/freeze_adapter.rs b/crates/python/src/freeze_adapter.rs index e65d77b9..146e4418 100644 --- a/crates/python/src/freeze_adapter.rs +++ b/crates/python/src/freeze_adapter.rs @@ -9,8 +9,9 @@ use cryo_cli::{run, Args}; #[pyfunction( signature = ( datatype, - blocks, + blocks = None, *, + txs = None, align = false, reorg_buffer = 0, include_columns = None, @@ -48,7 +49,8 @@ use cryo_cli::{run, Args}; pub fn _freeze( py: Python<'_>, datatype: Vec, - blocks: Vec, + blocks: Option>, + txs: Option>, align: bool, reorg_buffer: u64, include_columns: Option>, @@ -84,6 +86,7 @@ pub fn _freeze( let args = Args { datatype, blocks, + txs, align, reorg_buffer, include_columns,