Skip to content

Commit

Permalink
implement --txs for logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff authored Aug 6, 2023
1 parent 17554dc commit e734f06
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 66 deletions.
24 changes: 12 additions & 12 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ pub struct Args {
pub datatype: Vec<String>,

/// Block numbers, see syntax below
#[arg(short, long, allow_hyphen_values(true), help_heading = "Content Options")]
pub blocks: Option<Vec<String>>,

/// Transaction hashes, see syntax below
#[arg(
short,
long,
default_value = "0:latest",
allow_hyphen_values(true),
help_heading = "Content Options",
num_args = 1..
num_args(1..),
)]
pub blocks: Vec<String>,
pub txs: Option<Vec<String>>,

/// Align block chunk boundaries to regular intervals,
/// e.g. (1000, 2000, 3000) instead of (1106, 2106, 3106)
Expand All @@ -36,14 +38,6 @@ pub struct Args {
)]
pub reorg_buffer: u64,

// #[arg(
// short,
// long,
// allow_hyphen_values(true),
// help_heading = "Content Options",
// help = "Select by data transaction instead of by block,\ncan be a list or a file, see
// syntax below", )]
// pub txs: Vec<String>,
/// Columns to include alongside the default output,
/// use `all` to include all available columns
#[arg(short, long, value_name="COLS", num_args(0..), verbatim_doc_comment, help_heading="Content Options")]
Expand Down Expand Up @@ -203,6 +197,12 @@ fn get_after_str() -> &'static str {
- omitting range start means 0 <white><bold>:700</bold></white> == <white><bold>0:700</bold></white>
- minus on start means minus end <white><bold>-1000:7000</bold></white> == <white><bold>6000:7000</bold></white>
- plus sign on end means plus start <white><bold>15M:+1000</bold></white> == <white><bold>15M:15.001K</bold></white>
<white><bold>Transaction hash specification syntax</bold></white>
- can use transaction hashes <white><bold>--txs TX_HASH1 TX_HASH2 TX_HASH3</bold></white>
- can use a parquet file <white><bold>--txs ./path/to/file.parquet[:COLUMN_NAME]</bold></white>
(default column name is <white><bold>transaction_hash</bold></white>)
- can use multiple parquet files <white><bold>--txs ./path/to/ethereum__logs*.parquet</bold></white>
"#
)
}
Expand Down
31 changes: 30 additions & 1 deletion crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,50 @@ pub(crate) async fn parse_blocks(
args: &Args,
provider: Arc<Provider<Http>>,
) -> Result<Vec<Chunk>, ParseError> {
let block_chunks = parse_block_inputs(&args.blocks, &provider).await?;
// parse inputs into BlockChunks
let block_chunks = match &args.blocks {
Some(inputs) => parse_block_inputs(inputs, &provider).await?,
None => return Err(ParseError::ParseError("could not parse block inputs".to_string())),
};

postprocess_block_chunks(block_chunks, args, provider).await
}

async fn postprocess_block_chunks(
block_chunks: Vec<BlockChunk>,
args: &Args,
provider: Arc<Provider<Http>>,
) -> Result<Vec<Chunk>, ParseError> {
// align
let block_chunks = if args.align {
block_chunks.into_iter().filter_map(|x| x.align(args.chunk_size)).collect()
} else {
block_chunks
};

// split block range into chunks
let block_chunks = match args.n_chunks {
Some(n_chunks) => block_chunks.subchunk_by_count(&n_chunks),
None => block_chunks.subchunk_by_size(&args.chunk_size),
};

// apply reorg buffer
let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &provider).await?;

// put into Chunk enums
let chunks: Vec<Chunk> = block_chunks.iter().map(|x| Chunk::Block(x.clone())).collect();

Ok(chunks)
}

pub(crate) async fn get_default_block_chunks(
args: &Args,
provider: Arc<Provider<Http>>,
) -> Result<Vec<Chunk>, ParseError> {
let block_chunks = parse_block_inputs(&vec!["0:latest".to_string()], &provider).await?;
postprocess_block_chunks(block_chunks, args, provider).await
}

/// parse block numbers to freeze
async fn parse_block_inputs(
inputs: &Vec<String>,
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/parse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod blocks;
mod file_output;
mod query;
mod source;
mod transactions;

pub use args::*;
// use blocks::*;
Expand Down
11 changes: 9 additions & 2 deletions crates/cli/src/parse/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@ use hex::FromHex;

use cryo_freeze::{ColumnEncoding, Datatype, FileFormat, MultiQuery, ParseError, RowFilter, Table};

use super::{blocks, file_output};
use super::{blocks, file_output, transactions};
use crate::args::Args;

pub(crate) async fn parse_query(
args: &Args,
provider: Arc<Provider<Http>>,
) -> Result<MultiQuery, ParseError> {
let chunks = blocks::parse_blocks(args, provider).await?;
let chunks = match (&args.blocks, &args.txs) {
(Some(_), None) => blocks::parse_blocks(args, provider).await?,
(None, Some(txs)) => transactions::parse_transactions(txs)?,
(None, None) => blocks::get_default_block_chunks(args, provider).await?,
(Some(_), Some(_)) => {
return Err(ParseError::ParseError("specify only one of --blocks or --txs".to_string()))
}
};

// process schemas
let schemas = parse_schemas(args)?;
Expand Down
66 changes: 66 additions & 0 deletions crates/cli/src/parse/transactions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use cryo_freeze::{Chunk, ParseError, TransactionChunk};
use polars::prelude::*;

pub(crate) fn parse_transactions(txs: &[String]) -> Result<Vec<Chunk>, ParseError> {
let (files, hashes): (Vec<&String>, Vec<&String>) =
txs.iter().partition(|tx| std::path::Path::new(tx).exists());

let mut file_chunks = if !files.is_empty() {
let mut file_chunks = Vec::new();
for path in files {
let column = if path.contains(':') {
path.split(':')
.last()
.ok_or(ParseError::ParseError("could not parse txs path column".to_string()))?
} else {
"transaction_hash"
};
let tx_hashes = read_binary_column(path, column)
.map_err(|_e| ParseError::ParseError("could not read input".to_string()))?;
let chunk = TransactionChunk::Values(tx_hashes);
file_chunks.push(Chunk::Transaction(chunk));
}
file_chunks
} else {
Vec::new()
};

let hash_chunks = if !hashes.is_empty() {
let values: Result<Vec<Vec<u8>>, _> = hashes.iter().map(hex::decode).collect();
let values =
values.map_err(|_e| ParseError::ParseError("could not parse txs".to_string()))?;
let chunk = Chunk::Transaction(TransactionChunk::Values(values));
vec![chunk]
} else {
Vec::new()
};

file_chunks.extend(hash_chunks);
Ok(file_chunks)
}

fn read_binary_column(path: &str, column: &str) -> Result<Vec<Vec<u8>>, ParseError> {
let file = std::fs::File::open(path)
.map_err(|_e| ParseError::ParseError("could not open file path".to_string()))?;

let df = ParquetReader::new(file)
.with_columns(Some(vec![column.to_string()]))
.finish()
.map_err(|_e| ParseError::ParseError("could not read data from column".to_string()))?;

let series = df
.column(column)
.map_err(|_e| ParseError::ParseError("could not get column".to_string()))?;

let ca = series
.binary()
.map_err(|_e| ParseError::ParseError("could not convert to binary column".to_string()))?;

ca.into_iter()
.map(|value| {
value
.ok_or_else(|| ParseError::ParseError("transaction hash missing".to_string()))
.map(|data| data.into())
})
.collect()
}
Loading

0 comments on commit e734f06

Please sign in to comment.