Skip to content

Commit

Permalink
refactor: cleanup collecting and managing column data (#54)
Browse files Browse the repository at this point in the history
* Create LogsColumns and refactor transaction columns to transaction's file.

* Move counting n_rows to column objects.

* Formatting.
  • Loading branch information
kskalski authored Aug 31, 2023
1 parent 519e3d7 commit eedcbeb
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 323 deletions.
296 changes: 67 additions & 229 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use crate::{
types::{
conversions::{ToVecHex, ToVecU8},
BlockChunk, Blocks, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table,
TransactionChunk, U256Type,
TransactionChunk,
},
with_series, with_series_binary, with_series_u256,
with_series, with_series_binary,
};

use super::transactions::TransactionColumns;

pub(crate) type BlockTxGasTuple<TX> = Result<(Block<TX>, Option<Vec<u32>>), CollectError>;

#[async_trait::async_trait]
Expand Down Expand Up @@ -182,7 +184,7 @@ impl ProcessTransactions for TxHash {

impl ProcessTransactions for Transaction {
fn process(&self, schema: &Table, columns: &mut TransactionColumns, gas_used: Option<u32>) {
process_transaction(self, schema, columns, gas_used)
columns.process_transaction(self, schema, gas_used)
}
}

Expand All @@ -193,35 +195,25 @@ pub(crate) async fn blocks_to_dfs<TX: ProcessTransactions>(
chain_id: u64,
) -> Result<(Option<DataFrame>, Option<DataFrame>), CollectError> {
// initialize
let mut block_columns =
if blocks_schema.is_none() { BlockColumns::new(0) } else { BlockColumns::new(100) };
let mut transaction_columns = if transactions_schema.is_none() {
TransactionColumns::new(0)
} else {
TransactionColumns::new(100)
};
let mut block_columns = BlockColumns::default();
let mut transaction_columns = TransactionColumns::default();

// parse stream of blocks
let mut n_blocks = 0;
let mut n_txs = 0;
while let Some(message) = blocks.recv().await {
match message {
Ok((block, gas_used)) => {
n_blocks += 1;
if let Some(schema) = blocks_schema {
process_block(&block, schema, &mut block_columns)
block_columns.process_block(&block, schema)
}
if let Some(schema) = transactions_schema {
match gas_used {
Some(gas_used) => {
for (tx, gas_used) in block.transactions.iter().zip(gas_used) {
n_txs += 1;
tx.process(schema, &mut transaction_columns, Some(gas_used))
}
}
None => {
for tx in block.transactions.iter() {
n_txs += 1;
tx.process(schema, &mut transaction_columns, None)
}
}
Expand All @@ -237,17 +229,19 @@ pub(crate) async fn blocks_to_dfs<TX: ProcessTransactions>(

// convert to dataframes
let blocks_df = match blocks_schema {
Some(schema) => Some(block_columns.create_df(schema, chain_id, n_blocks)?),
Some(schema) => Some(block_columns.create_df(schema, chain_id)?),
None => None,
};
let transactions_df = match transactions_schema {
Some(schema) => Some(transaction_columns.create_df(schema, chain_id, n_txs)?),
Some(schema) => Some(transaction_columns.create_df(schema, chain_id)?),
None => None,
};
Ok((blocks_df, transactions_df))
}

#[derive(Default)]
struct BlockColumns {
n_rows: usize,
hash: Vec<Vec<u8>>,
parent_hash: Vec<Vec<u8>>,
author: Vec<Vec<u8>>,
Expand All @@ -265,32 +259,63 @@ struct BlockColumns {
}

impl BlockColumns {
fn new(n: usize) -> Self {
Self {
hash: Vec::with_capacity(n),
parent_hash: Vec::with_capacity(n),
author: Vec::with_capacity(n),
state_root: Vec::with_capacity(n),
transactions_root: Vec::with_capacity(n),
receipts_root: Vec::with_capacity(n),
number: Vec::with_capacity(n),
gas_used: Vec::with_capacity(n),
extra_data: Vec::with_capacity(n),
logs_bloom: Vec::with_capacity(n),
timestamp: Vec::with_capacity(n),
total_difficulty: Vec::with_capacity(n),
size: Vec::with_capacity(n),
base_fee_per_gas: Vec::with_capacity(n),
fn process_block<TX>(&mut self, block: &Block<TX>, schema: &Table) {
self.n_rows += 1;
if schema.has_column("hash") {
match block.hash {
Some(h) => self.hash.push(h.as_bytes().to_vec()),
_ => panic!("invalid block"),
}
}
if schema.has_column("parent_hash") {
self.parent_hash.push(block.parent_hash.as_bytes().to_vec());
}
if schema.has_column("author") {
match block.author {
Some(a) => self.author.push(a.as_bytes().to_vec()),
_ => panic!("invalid block"),
}
}
if schema.has_column("state_root") {
self.state_root.push(block.state_root.as_bytes().to_vec());
}
if schema.has_column("transactions_root") {
self.transactions_root.push(block.transactions_root.as_bytes().to_vec());
}
if schema.has_column("receipts_root") {
self.receipts_root.push(block.receipts_root.as_bytes().to_vec());
}
if schema.has_column("number") {
match block.number {
Some(n) => self.number.push(n.as_u32()),
_ => panic!("invalid block"),
}
}
if schema.has_column("gas_used") {
self.gas_used.push(block.gas_used.as_u32());
}
if schema.has_column("extra_data") {
self.extra_data.push(block.extra_data.to_vec());
}
if schema.has_column("logs_bloom") {
self.logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec()));
}
if schema.has_column("timestamp") {
self.timestamp.push(block.timestamp.as_u32());
}
if schema.has_column("total_difficulty") {
self.total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8()));
}
if schema.has_column("size") {
self.size.push(block.size.map(|x| x.as_u32()));
}
if schema.has_column("base_fee_per_gas") {
self.base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64()));
}
}

fn create_df(
self,
schema: &Table,
chain_id: u64,
n_rows: u64,
) -> Result<DataFrame, CollectError> {
let mut cols = Vec::new();
fn create_df(self, schema: &Table, chain_id: u64) -> Result<DataFrame, CollectError> {
let mut cols = Vec::with_capacity(schema.columns().len());
with_series_binary!(cols, "hash", self.hash, schema);
with_series_binary!(cols, "parent_hash", self.parent_hash, schema);
with_series_binary!(cols, "author", self.author, schema);
Expand All @@ -305,195 +330,8 @@ impl BlockColumns {
with_series_binary!(cols, "total_difficulty", self.total_difficulty, schema);
with_series!(cols, "size", self.size, schema);
with_series!(cols, "base_fee_per_gas", self.base_fee_per_gas, schema);

if schema.has_column("chain_id") {
cols.push(Series::new("chain_id", vec![chain_id; n_rows as usize]));
}
with_series!(cols, "chain_id", vec![chain_id; self.n_rows], schema);

DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema)
}
}

pub(crate) struct TransactionColumns {
block_number: Vec<Option<u64>>,
transaction_index: Vec<Option<u64>>,
transaction_hash: Vec<Vec<u8>>,
nonce: Vec<u64>,
from_address: Vec<Vec<u8>>,
to_address: Vec<Option<Vec<u8>>>,
value: Vec<U256>,
input: Vec<Vec<u8>>,
gas_limit: Vec<u32>,
gas_used: Vec<u32>,
gas_price: Vec<Option<u64>>,
transaction_type: Vec<Option<u32>>,
max_priority_fee_per_gas: Vec<Option<u64>>,
max_fee_per_gas: Vec<Option<u64>>,
}

impl TransactionColumns {
pub(crate) fn new(n: usize) -> Self {
Self {
block_number: Vec::with_capacity(n),
transaction_index: Vec::with_capacity(n),
transaction_hash: Vec::with_capacity(n),
nonce: Vec::with_capacity(n),
from_address: Vec::with_capacity(n),
to_address: Vec::with_capacity(n),
value: Vec::with_capacity(n),
input: Vec::with_capacity(n),
gas_limit: Vec::with_capacity(n),
gas_used: Vec::with_capacity(n),
gas_price: Vec::with_capacity(n),
transaction_type: Vec::with_capacity(n),
max_priority_fee_per_gas: Vec::with_capacity(n),
max_fee_per_gas: Vec::with_capacity(n),
}
}

pub(crate) fn create_df(
self,
schema: &Table,
chain_id: u64,
n_rows: usize,
) -> Result<DataFrame, CollectError> {
let mut cols = Vec::new();
with_series!(cols, "block_number", self.block_number, schema);
with_series!(cols, "transaction_index", self.transaction_index, schema);
with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema);
with_series!(cols, "nonce", self.nonce, schema);
with_series_binary!(cols, "from_address", self.from_address, schema);
with_series_binary!(cols, "to_address", self.to_address, schema);
with_series_u256!(cols, "value", self.value, schema);
with_series_binary!(cols, "input", self.input, schema);
with_series!(cols, "gas_limit", self.gas_limit, schema);
with_series!(cols, "gas_used", self.gas_used, schema);
with_series!(cols, "gas_price", self.gas_price, schema);
with_series!(cols, "transaction_type", self.transaction_type, schema);
with_series!(cols, "max_priority_fee_per_gas", self.max_priority_fee_per_gas, schema);
with_series!(cols, "max_fee_per_gas", self.max_fee_per_gas, schema);

if schema.has_column("chain_id") {
cols.push(Series::new("chain_id", vec![chain_id; n_rows]));
}

DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema)
}
}

fn process_block<TX>(block: &Block<TX>, schema: &Table, columns: &mut BlockColumns) {
if schema.has_column("hash") {
match block.hash {
Some(h) => columns.hash.push(h.as_bytes().to_vec()),
_ => panic!("invalid block"),
}
}
if schema.has_column("parent_hash") {
columns.parent_hash.push(block.parent_hash.as_bytes().to_vec());
}
if schema.has_column("author") {
match block.author {
Some(a) => columns.author.push(a.as_bytes().to_vec()),
_ => panic!("invalid block"),
}
}
if schema.has_column("state_root") {
columns.state_root.push(block.state_root.as_bytes().to_vec());
}
if schema.has_column("transactions_root") {
columns.transactions_root.push(block.transactions_root.as_bytes().to_vec());
}
if schema.has_column("receipts_root") {
columns.receipts_root.push(block.receipts_root.as_bytes().to_vec());
}
if schema.has_column("number") {
match block.number {
Some(n) => columns.number.push(n.as_u32()),
_ => panic!("invalid block"),
}
}
if schema.has_column("gas_used") {
columns.gas_used.push(block.gas_used.as_u32());
}
if schema.has_column("extra_data") {
columns.extra_data.push(block.extra_data.to_vec());
}
if schema.has_column("logs_bloom") {
columns.logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec()));
}
if schema.has_column("timestamp") {
columns.timestamp.push(block.timestamp.as_u32());
}
if schema.has_column("total_difficulty") {
columns.total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8()));
}
if schema.has_column("size") {
columns.size.push(block.size.map(|x| x.as_u32()));
}
if schema.has_column("base_fee_per_gas") {
columns.base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64()));
}
}

fn process_transaction(
tx: &Transaction,
schema: &Table,
columns: &mut TransactionColumns,
gas_used: Option<u32>,
) {
if schema.has_column("block_number") {
match tx.block_number {
Some(block_number) => columns.block_number.push(Some(block_number.as_u64())),
None => columns.block_number.push(None),
}
}
if schema.has_column("transaction_index") {
match tx.transaction_index {
Some(transaction_index) => {
columns.transaction_index.push(Some(transaction_index.as_u64()))
}
None => columns.transaction_index.push(None),
}
}
if schema.has_column("transaction_hash") {
columns.transaction_hash.push(tx.hash.as_bytes().to_vec());
}
if schema.has_column("from_address") {
columns.from_address.push(tx.from.as_bytes().to_vec());
}
if schema.has_column("to_address") {
match tx.to {
Some(to_address) => columns.to_address.push(Some(to_address.as_bytes().to_vec())),
None => columns.to_address.push(None),
}
}
if schema.has_column("nonce") {
columns.nonce.push(tx.nonce.as_u64());
}
if schema.has_column("value") {
columns.value.push(tx.value);
}
if schema.has_column("input") {
columns.input.push(tx.input.to_vec());
}
if schema.has_column("gas_limit") {
columns.gas_limit.push(tx.gas.as_u32());
}
if schema.has_column("gas_used") {
columns.gas_used.push(gas_used.unwrap())
}
if schema.has_column("gas_price") {
columns.gas_price.push(tx.gas_price.map(|gas_price| gas_price.as_u64()));
}
if schema.has_column("transaction_type") {
columns.transaction_type.push(tx.transaction_type.map(|value| value.as_u32()));
}
if schema.has_column("max_priority_fee_per_gas") {
columns
.max_priority_fee_per_gas
.push(tx.max_priority_fee_per_gas.map(|value| value.as_u64()));
}
if schema.has_column("max_fee_per_gas") {
columns.max_fee_per_gas.push(tx.max_fee_per_gas.map(|value| value.as_u64()));
}
}
Loading

0 comments on commit eedcbeb

Please sign in to comment.