Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(source): extract concurrency checks and rpc fetches into separate Fetcher struct #58

Merged
merged 3 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/cli/src/parse/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{file_output, query, source};
/// parse options for running freeze
pub async fn parse_opts(args: &Args) -> Result<(MultiQuery, Source, FileOutput), ParseError> {
let source = source::parse_source(args).await?;
let query = query::parse_query(args, Arc::clone(&source.provider)).await?;
let query = query::parse_query(args, Arc::clone(&source.fetcher)).await?;
let sink = file_output::parse_file_output(args, &source)?;
Ok((query, source, sink))
}
125 changes: 55 additions & 70 deletions crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use ethers::prelude::*;
use polars::prelude::*;

use cryo_freeze::{BlockChunk, Chunk, ChunkData, ParseError, Subchunk};
use cryo_freeze::{BlockChunk, Chunk, ChunkData, Fetcher, ParseError, Subchunk};

use crate::args::Args;

pub(crate) async fn parse_blocks(
pub(crate) async fn parse_blocks<P: JsonRpcClient>(
args: &Args,
provider: Arc<Provider<Http>>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<(Chunk, Option<String>)>, ParseError> {
let (files, explicit_numbers): (Vec<&String>, Vec<&String>) = match &args.blocks {
Some(blocks) => blocks.iter().partition(|tx| std::path::Path::new(tx).exists()),
Expand Down Expand Up @@ -42,10 +42,10 @@ pub(crate) async fn parse_blocks(
// parse inputs into BlockChunks
let mut block_chunks = Vec::new();
for explicit_number in explicit_numbers {
let outputs = parse_block_inputs(explicit_number, &provider).await?;
let outputs = parse_block_inputs(explicit_number, &fetcher).await?;
block_chunks.extend(outputs);
}
postprocess_block_chunks(block_chunks, args, provider).await?
postprocess_block_chunks(block_chunks, args, fetcher).await?
} else {
Vec::new()
};
Expand Down Expand Up @@ -92,10 +92,10 @@ fn read_integer_column(path: &str, column: &str) -> Result<Vec<u64>, ParseError>
}
}

async fn postprocess_block_chunks(
async fn postprocess_block_chunks<P: JsonRpcClient>(
block_chunks: Vec<BlockChunk>,
args: &Args,
provider: Arc<Provider<Http>>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<(Chunk, Option<String>)>, ParseError> {
// align
let block_chunks = if args.align {
Expand All @@ -111,7 +111,7 @@ async fn postprocess_block_chunks(
};

// apply reorg buffer
let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &provider).await?;
let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &fetcher).await?;

// put into Chunk enums
let chunks: Vec<(Chunk, Option<String>)> =
Expand All @@ -120,34 +120,31 @@ async fn postprocess_block_chunks(
Ok(chunks)
}

pub(crate) async fn get_default_block_chunks(
pub(crate) async fn get_default_block_chunks<P: JsonRpcClient>(
args: &Args,
provider: Arc<Provider<Http>>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<(Chunk, Option<String>)>, ParseError> {
let block_chunks = parse_block_inputs(&String::from(r"0:latest"), &provider).await?;
postprocess_block_chunks(block_chunks, args, provider).await
let block_chunks = parse_block_inputs(&String::from(r"0:latest"), &fetcher).await?;
postprocess_block_chunks(block_chunks, args, fetcher).await
}

/// parse block numbers to freeze
async fn parse_block_inputs<P>(
async fn parse_block_inputs<P: JsonRpcClient>(
inputs: &str,
provider: &Provider<P>,
) -> Result<Vec<BlockChunk>, ParseError>
where
P: JsonRpcClient,
{
fetcher: &Fetcher<P>,
) -> Result<Vec<BlockChunk>, ParseError> {
let parts: Vec<&str> = inputs.split(' ').collect();
match parts.len() {
1 => {
let first_input = parts.first().ok_or_else(|| {
ParseError::ParseError("Failed to get the first input".to_string())
})?;
parse_block_token(first_input, true, provider).await.map(|x| vec![x])
parse_block_token(first_input, true, fetcher).await.map(|x| vec![x])
}
_ => {
let mut chunks = Vec::new();
for part in parts {
chunks.push(parse_block_token(part, false, provider).await?);
chunks.push(parse_block_token(part, false, fetcher).await?);
}
Ok(chunks)
}
Expand All @@ -160,27 +157,24 @@ enum RangePosition {
None,
}

async fn parse_block_token<P>(
async fn parse_block_token<P: JsonRpcClient>(
s: &str,
as_range: bool,
provider: &Provider<P>,
) -> Result<BlockChunk, ParseError>
where
P: JsonRpcClient,
{
fetcher: &Fetcher<P>,
) -> Result<BlockChunk, ParseError> {
let s = s.replace('_', "");

let parts: Vec<&str> = s.split(':').collect();
match parts.as_slice() {
[block_ref] => {
let block = parse_block_number(block_ref, RangePosition::None, provider).await?;
let block = parse_block_number(block_ref, RangePosition::None, fetcher).await?;
Ok(BlockChunk::Numbers(vec![block]))
}
[first_ref, second_ref] => {
let (start_block, end_block) = match (first_ref, second_ref) {
_ if first_ref.starts_with('-') => {
let end_block =
parse_block_number(second_ref, RangePosition::Last, provider).await?;
parse_block_number(second_ref, RangePosition::Last, fetcher).await?;
let start_block = end_block
.checked_sub(first_ref[1..].parse::<u64>().map_err(|_e| {
ParseError::ParseError("start_block parse error".to_string())
Expand All @@ -192,7 +186,7 @@ where
}
_ if second_ref.starts_with('+') => {
let start_block =
parse_block_number(first_ref, RangePosition::First, provider).await?;
parse_block_number(first_ref, RangePosition::First, fetcher).await?;
let end_block = start_block
.checked_add(second_ref[1..].parse::<u64>().map_err(|_e| {
ParseError::ParseError("start_block parse error".to_string())
Expand All @@ -202,9 +196,9 @@ where
}
_ => {
let start_block =
parse_block_number(first_ref, RangePosition::First, provider).await?;
parse_block_number(first_ref, RangePosition::First, fetcher).await?;
let end_block =
parse_block_number(second_ref, RangePosition::Last, provider).await?;
parse_block_number(second_ref, RangePosition::Last, fetcher).await?;
(start_block, end_block)
}
};
Expand All @@ -225,21 +219,18 @@ where
}
}

async fn parse_block_number<P>(
async fn parse_block_number<P: JsonRpcClient>(
block_ref: &str,
range_position: RangePosition,
provider: &Provider<P>,
) -> Result<u64, ParseError>
where
P: JsonRpcClient,
{
fetcher: &Fetcher<P>,
) -> Result<u64, ParseError> {
match (block_ref, range_position) {
("latest", _) => provider.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
("latest", _) => fetcher.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
ParseError::ParseError("Error retrieving latest block number".to_string())
}),
("", RangePosition::First) => Ok(0),
("", RangePosition::Last) => {
provider.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
fetcher.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
ParseError::ParseError("Error retrieving last block number".to_string())
})
}
Expand Down Expand Up @@ -269,15 +260,15 @@ where
}
}

async fn apply_reorg_buffer(
async fn apply_reorg_buffer<P: JsonRpcClient>(
block_chunks: Vec<BlockChunk>,
reorg_filter: u64,
provider: &Provider<Http>,
fetcher: &Fetcher<P>,
) -> Result<Vec<BlockChunk>, ParseError> {
match reorg_filter {
0 => Ok(block_chunks),
reorg_filter => {
let latest_block = match provider.get_block_number().await {
let latest_block = match fetcher.get_block_number().await {
Ok(result) => result.as_u64(),
Err(_e) => {
return Err(ParseError::ParseError("reorg buffer parse error".to_string()))
Expand Down Expand Up @@ -306,38 +297,36 @@ mod tests {

async fn block_token_test_helper(tests: Vec<(BlockTokenTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None };
for (test, res) in tests {
match test {
BlockTokenTest::WithMock((token, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
assert_eq!(block_token_test_executor(token, expected, &provider).await, res);
assert_eq!(block_token_test_executor(token, expected, &fetcher).await, res);
}
BlockTokenTest::WithoutMock((token, expected)) => {
assert_eq!(block_token_test_executor(token, expected, &provider).await, res);
assert_eq!(block_token_test_executor(token, expected, &fetcher).await, res);
}
}
}
}

async fn block_token_test_executor<P>(
async fn block_token_test_executor<P: JsonRpcClient>(
token: &str,
expected: BlockChunk,
provider: &Provider<P>,
) -> bool
where
P: JsonRpcClient,
{
fetcher: &Fetcher<P>,
) -> bool {
match expected {
BlockChunk::Numbers(expected_block_numbers) => {
let block_chunks = parse_block_token(token, false, provider).await.unwrap();
let block_chunks = parse_block_token(token, false, fetcher).await.unwrap();
assert!(matches!(block_chunks, BlockChunk::Numbers { .. }));
let BlockChunk::Numbers(block_numbers) = block_chunks else {
panic!("Unexpected shape")
};
block_numbers == expected_block_numbers
}
BlockChunk::Range(expected_range_start, expected_range_end) => {
let block_chunks = parse_block_token(token, true, provider).await.unwrap();
let block_chunks = parse_block_token(token, true, fetcher).await.unwrap();
assert!(matches!(block_chunks, BlockChunk::Range { .. }));
let BlockChunk::Range(range_start, range_end) = block_chunks else {
panic!("Unexpected shape")
Expand All @@ -354,28 +343,26 @@ mod tests {

async fn block_input_test_helper(tests: Vec<(BlockInputTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None };
for (test, res) in tests {
match test {
BlockInputTest::WithMock((inputs, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
assert_eq!(block_input_test_executor(inputs, expected, &provider).await, res);
assert_eq!(block_input_test_executor(inputs, expected, &fetcher).await, res);
}
BlockInputTest::WithoutMock((inputs, expected)) => {
assert_eq!(block_input_test_executor(inputs, expected, &provider).await, res);
assert_eq!(block_input_test_executor(inputs, expected, &fetcher).await, res);
}
}
}
}

async fn block_input_test_executor<P>(
async fn block_input_test_executor<P: JsonRpcClient>(
inputs: &str,
expected: Vec<BlockChunk>,
provider: &Provider<P>,
) -> bool
where
P: JsonRpcClient,
{
let block_chunks = parse_block_inputs(inputs, provider).await.unwrap();
fetcher: &Fetcher<P>,
) -> bool {
let block_chunks = parse_block_inputs(inputs, fetcher).await.unwrap();
assert_eq!(block_chunks.len(), expected.len());
for (i, block_chunk) in block_chunks.iter().enumerate() {
let expected_chunk = &expected[i];
Expand Down Expand Up @@ -410,19 +397,20 @@ mod tests {

async fn block_number_test_helper(tests: Vec<(BlockNumberTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None };
for (test, res) in tests {
match test {
BlockNumberTest::WithMock((block_ref, range_position, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
assert_eq!(
block_number_test_executor(block_ref, range_position, expected, &provider)
block_number_test_executor(block_ref, range_position, expected, &fetcher)
.await,
res
);
}
BlockNumberTest::WithoutMock((block_ref, range_position, expected)) => {
assert_eq!(
block_number_test_executor(block_ref, range_position, expected, &provider)
block_number_test_executor(block_ref, range_position, expected, &fetcher)
.await,
res
);
Expand All @@ -431,16 +419,13 @@ mod tests {
}
}

async fn block_number_test_executor<P>(
async fn block_number_test_executor<P: JsonRpcClient>(
block_ref: &str,
range_position: RangePosition,
expected: u64,
provider: &Provider<P>,
) -> bool
where
P: JsonRpcClient,
{
let block_number = parse_block_number(block_ref, range_position, provider).await.unwrap();
fetcher: &Fetcher<P>,
) -> bool {
let block_number = parse_block_number(block_ref, range_position, fetcher).await.unwrap();
block_number == expected
}

Expand Down
11 changes: 6 additions & 5 deletions crates/cli/src/parse/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,22 @@ use ethers::prelude::*;
use hex::FromHex;

use cryo_freeze::{
ColumnEncoding, Datatype, FileFormat, LogDecoder, MultiQuery, ParseError, RowFilter, Table,
ColumnEncoding, Datatype, Fetcher, FileFormat, LogDecoder, MultiQuery, ParseError, RowFilter,
Table,
};

use super::{blocks, file_output, transactions};
use crate::args::Args;
use cryo_freeze::U256Type;

pub(crate) async fn parse_query(
pub(crate) async fn parse_query<P: JsonRpcClient>(
args: &Args,
provider: Arc<Provider<Http>>,
fetcher: Arc<Fetcher<P>>,
) -> Result<MultiQuery, ParseError> {
let chunks = match (&args.blocks, &args.txs) {
(Some(_), None) => blocks::parse_blocks(args, provider).await?,
(Some(_), None) => blocks::parse_blocks(args, fetcher).await?,
(None, Some(txs)) => transactions::parse_transactions(txs)?,
(None, None) => blocks::get_default_block_chunks(args, provider).await?,
(None, None) => blocks::get_default_block_chunks(args, fetcher).await?,
(Some(_), Some(_)) => {
return Err(ParseError::ParseError("specify only one of --blocks or --txs".to_string()))
}
Expand Down
11 changes: 5 additions & 6 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use governor::{Quota, RateLimiter};
use polars::prelude::*;
use std::num::NonZeroU32;

use cryo_freeze::{ParseError, Source};
use cryo_freeze::{Fetcher, ParseError, Source};

use crate::args::Args;

Expand All @@ -20,7 +20,7 @@ pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
Some(rate_limit) => match NonZeroU32::new(rate_limit) {
Some(value) => {
let quota = Quota::per_second(value);
Some(Arc::new(RateLimiter::direct(quota)))
Some(RateLimiter::direct(quota))
}
_ => None,
},
Expand All @@ -32,13 +32,12 @@ pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
let max_concurrent_chunks = args.max_concurrent_chunks.unwrap_or(3);

let semaphore = tokio::sync::Semaphore::new(max_concurrent_requests as usize);
let semaphore = Some(Arc::new(semaphore));
let semaphore = Some(semaphore);

let fetcher = Fetcher { provider, semaphore, rate_limiter };
let output = Source {
provider: Arc::new(provider),
fetcher: Arc::new(fetcher),
chain_id,
semaphore,
rate_limiter,
inner_request_size: args.inner_request_size,
max_concurrent_chunks,
};
Expand Down
Loading
Loading