Skip to content

Commit

Permalink
refactor(source): extract concurrency checks and rpc fetches into sep…
Browse files Browse the repository at this point in the history
…arate `Fetcher` struct (#58)

* Move semaphore and rate-limit to Fetcher.

* Factor out Fetcher to centralize concurrency checks.

* Fix parameter name.
  • Loading branch information
kskalski authored Sep 5, 2023
1 parent 54a47f4 commit 7ca0073
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 294 deletions.
2 changes: 1 addition & 1 deletion crates/cli/src/parse/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{file_output, query, source};
/// parse options for running freeze
pub async fn parse_opts(args: &Args) -> Result<(MultiQuery, Source, FileOutput), ParseError> {
let source = source::parse_source(args).await?;
let query = query::parse_query(args, Arc::clone(&source.provider)).await?;
let query = query::parse_query(args, Arc::clone(&source.fetcher)).await?;
let sink = file_output::parse_file_output(args, &source)?;
Ok((query, source, sink))
}
125 changes: 55 additions & 70 deletions crates/cli/src/parse/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use ethers::prelude::*;
use polars::prelude::*;

use cryo_freeze::{BlockChunk, Chunk, ChunkData, ParseError, Subchunk};
use cryo_freeze::{BlockChunk, Chunk, ChunkData, Fetcher, ParseError, Subchunk};

use crate::args::Args;

pub(crate) async fn parse_blocks(
pub(crate) async fn parse_blocks<P: JsonRpcClient>(
args: &Args,
provider: Arc<Provider<Http>>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<(Chunk, Option<String>)>, ParseError> {
let (files, explicit_numbers): (Vec<&String>, Vec<&String>) = match &args.blocks {
Some(blocks) => blocks.iter().partition(|tx| std::path::Path::new(tx).exists()),
Expand Down Expand Up @@ -42,10 +42,10 @@ pub(crate) async fn parse_blocks(
// parse inputs into BlockChunks
let mut block_chunks = Vec::new();
for explicit_number in explicit_numbers {
let outputs = parse_block_inputs(explicit_number, &provider).await?;
let outputs = parse_block_inputs(explicit_number, &fetcher).await?;
block_chunks.extend(outputs);
}
postprocess_block_chunks(block_chunks, args, provider).await?
postprocess_block_chunks(block_chunks, args, fetcher).await?
} else {
Vec::new()
};
Expand Down Expand Up @@ -92,10 +92,10 @@ fn read_integer_column(path: &str, column: &str) -> Result<Vec<u64>, ParseError>
}
}

async fn postprocess_block_chunks(
async fn postprocess_block_chunks<P: JsonRpcClient>(
block_chunks: Vec<BlockChunk>,
args: &Args,
provider: Arc<Provider<Http>>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<(Chunk, Option<String>)>, ParseError> {
// align
let block_chunks = if args.align {
Expand All @@ -111,7 +111,7 @@ async fn postprocess_block_chunks(
};

// apply reorg buffer
let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &provider).await?;
let block_chunks = apply_reorg_buffer(block_chunks, args.reorg_buffer, &fetcher).await?;

// put into Chunk enums
let chunks: Vec<(Chunk, Option<String>)> =
Expand All @@ -120,34 +120,31 @@ async fn postprocess_block_chunks(
Ok(chunks)
}

pub(crate) async fn get_default_block_chunks(
pub(crate) async fn get_default_block_chunks<P: JsonRpcClient>(
args: &Args,
provider: Arc<Provider<Http>>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<(Chunk, Option<String>)>, ParseError> {
let block_chunks = parse_block_inputs(&String::from(r"0:latest"), &provider).await?;
postprocess_block_chunks(block_chunks, args, provider).await
let block_chunks = parse_block_inputs(&String::from(r"0:latest"), &fetcher).await?;
postprocess_block_chunks(block_chunks, args, fetcher).await
}

/// parse block numbers to freeze
async fn parse_block_inputs<P>(
async fn parse_block_inputs<P: JsonRpcClient>(
inputs: &str,
provider: &Provider<P>,
) -> Result<Vec<BlockChunk>, ParseError>
where
P: JsonRpcClient,
{
fetcher: &Fetcher<P>,
) -> Result<Vec<BlockChunk>, ParseError> {
let parts: Vec<&str> = inputs.split(' ').collect();
match parts.len() {
1 => {
let first_input = parts.first().ok_or_else(|| {
ParseError::ParseError("Failed to get the first input".to_string())
})?;
parse_block_token(first_input, true, provider).await.map(|x| vec![x])
parse_block_token(first_input, true, fetcher).await.map(|x| vec![x])
}
_ => {
let mut chunks = Vec::new();
for part in parts {
chunks.push(parse_block_token(part, false, provider).await?);
chunks.push(parse_block_token(part, false, fetcher).await?);
}
Ok(chunks)
}
Expand All @@ -160,27 +157,24 @@ enum RangePosition {
None,
}

async fn parse_block_token<P>(
async fn parse_block_token<P: JsonRpcClient>(
s: &str,
as_range: bool,
provider: &Provider<P>,
) -> Result<BlockChunk, ParseError>
where
P: JsonRpcClient,
{
fetcher: &Fetcher<P>,
) -> Result<BlockChunk, ParseError> {
let s = s.replace('_', "");

let parts: Vec<&str> = s.split(':').collect();
match parts.as_slice() {
[block_ref] => {
let block = parse_block_number(block_ref, RangePosition::None, provider).await?;
let block = parse_block_number(block_ref, RangePosition::None, fetcher).await?;
Ok(BlockChunk::Numbers(vec![block]))
}
[first_ref, second_ref] => {
let (start_block, end_block) = match (first_ref, second_ref) {
_ if first_ref.starts_with('-') => {
let end_block =
parse_block_number(second_ref, RangePosition::Last, provider).await?;
parse_block_number(second_ref, RangePosition::Last, fetcher).await?;
let start_block = end_block
.checked_sub(first_ref[1..].parse::<u64>().map_err(|_e| {
ParseError::ParseError("start_block parse error".to_string())
Expand All @@ -192,7 +186,7 @@ where
}
_ if second_ref.starts_with('+') => {
let start_block =
parse_block_number(first_ref, RangePosition::First, provider).await?;
parse_block_number(first_ref, RangePosition::First, fetcher).await?;
let end_block = start_block
.checked_add(second_ref[1..].parse::<u64>().map_err(|_e| {
ParseError::ParseError("start_block parse error".to_string())
Expand All @@ -202,9 +196,9 @@ where
}
_ => {
let start_block =
parse_block_number(first_ref, RangePosition::First, provider).await?;
parse_block_number(first_ref, RangePosition::First, fetcher).await?;
let end_block =
parse_block_number(second_ref, RangePosition::Last, provider).await?;
parse_block_number(second_ref, RangePosition::Last, fetcher).await?;
(start_block, end_block)
}
};
Expand All @@ -225,21 +219,18 @@ where
}
}

async fn parse_block_number<P>(
async fn parse_block_number<P: JsonRpcClient>(
block_ref: &str,
range_position: RangePosition,
provider: &Provider<P>,
) -> Result<u64, ParseError>
where
P: JsonRpcClient,
{
fetcher: &Fetcher<P>,
) -> Result<u64, ParseError> {
match (block_ref, range_position) {
("latest", _) => provider.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
("latest", _) => fetcher.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
ParseError::ParseError("Error retrieving latest block number".to_string())
}),
("", RangePosition::First) => Ok(0),
("", RangePosition::Last) => {
provider.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
fetcher.get_block_number().await.map(|n| n.as_u64()).map_err(|_e| {
ParseError::ParseError("Error retrieving last block number".to_string())
})
}
Expand Down Expand Up @@ -269,15 +260,15 @@ where
}
}

async fn apply_reorg_buffer(
async fn apply_reorg_buffer<P: JsonRpcClient>(
block_chunks: Vec<BlockChunk>,
reorg_filter: u64,
provider: &Provider<Http>,
fetcher: &Fetcher<P>,
) -> Result<Vec<BlockChunk>, ParseError> {
match reorg_filter {
0 => Ok(block_chunks),
reorg_filter => {
let latest_block = match provider.get_block_number().await {
let latest_block = match fetcher.get_block_number().await {
Ok(result) => result.as_u64(),
Err(_e) => {
return Err(ParseError::ParseError("reorg buffer parse error".to_string()))
Expand Down Expand Up @@ -306,38 +297,36 @@ mod tests {

async fn block_token_test_helper(tests: Vec<(BlockTokenTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None };
for (test, res) in tests {
match test {
BlockTokenTest::WithMock((token, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
assert_eq!(block_token_test_executor(token, expected, &provider).await, res);
assert_eq!(block_token_test_executor(token, expected, &fetcher).await, res);
}
BlockTokenTest::WithoutMock((token, expected)) => {
assert_eq!(block_token_test_executor(token, expected, &provider).await, res);
assert_eq!(block_token_test_executor(token, expected, &fetcher).await, res);
}
}
}
}

async fn block_token_test_executor<P>(
async fn block_token_test_executor<P: JsonRpcClient>(
token: &str,
expected: BlockChunk,
provider: &Provider<P>,
) -> bool
where
P: JsonRpcClient,
{
fetcher: &Fetcher<P>,
) -> bool {
match expected {
BlockChunk::Numbers(expected_block_numbers) => {
let block_chunks = parse_block_token(token, false, provider).await.unwrap();
let block_chunks = parse_block_token(token, false, fetcher).await.unwrap();
assert!(matches!(block_chunks, BlockChunk::Numbers { .. }));
let BlockChunk::Numbers(block_numbers) = block_chunks else {
panic!("Unexpected shape")
};
block_numbers == expected_block_numbers
}
BlockChunk::Range(expected_range_start, expected_range_end) => {
let block_chunks = parse_block_token(token, true, provider).await.unwrap();
let block_chunks = parse_block_token(token, true, fetcher).await.unwrap();
assert!(matches!(block_chunks, BlockChunk::Range { .. }));
let BlockChunk::Range(range_start, range_end) = block_chunks else {
panic!("Unexpected shape")
Expand All @@ -354,28 +343,26 @@ mod tests {

async fn block_input_test_helper(tests: Vec<(BlockInputTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None };
for (test, res) in tests {
match test {
BlockInputTest::WithMock((inputs, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
assert_eq!(block_input_test_executor(inputs, expected, &provider).await, res);
assert_eq!(block_input_test_executor(inputs, expected, &fetcher).await, res);
}
BlockInputTest::WithoutMock((inputs, expected)) => {
assert_eq!(block_input_test_executor(inputs, expected, &provider).await, res);
assert_eq!(block_input_test_executor(inputs, expected, &fetcher).await, res);
}
}
}
}

async fn block_input_test_executor<P>(
async fn block_input_test_executor<P: JsonRpcClient>(
inputs: &str,
expected: Vec<BlockChunk>,
provider: &Provider<P>,
) -> bool
where
P: JsonRpcClient,
{
let block_chunks = parse_block_inputs(inputs, provider).await.unwrap();
fetcher: &Fetcher<P>,
) -> bool {
let block_chunks = parse_block_inputs(inputs, fetcher).await.unwrap();
assert_eq!(block_chunks.len(), expected.len());
for (i, block_chunk) in block_chunks.iter().enumerate() {
let expected_chunk = &expected[i];
Expand Down Expand Up @@ -410,19 +397,20 @@ mod tests {

async fn block_number_test_helper(tests: Vec<(BlockNumberTest<'_>, bool)>) {
let (provider, mock) = Provider::mocked();
let fetcher = Fetcher { provider, semaphore: None, rate_limiter: None };
for (test, res) in tests {
match test {
BlockNumberTest::WithMock((block_ref, range_position, expected, latest)) => {
mock.push(U64::from(latest)).unwrap();
assert_eq!(
block_number_test_executor(block_ref, range_position, expected, &provider)
block_number_test_executor(block_ref, range_position, expected, &fetcher)
.await,
res
);
}
BlockNumberTest::WithoutMock((block_ref, range_position, expected)) => {
assert_eq!(
block_number_test_executor(block_ref, range_position, expected, &provider)
block_number_test_executor(block_ref, range_position, expected, &fetcher)
.await,
res
);
Expand All @@ -431,16 +419,13 @@ mod tests {
}
}

async fn block_number_test_executor<P>(
async fn block_number_test_executor<P: JsonRpcClient>(
block_ref: &str,
range_position: RangePosition,
expected: u64,
provider: &Provider<P>,
) -> bool
where
P: JsonRpcClient,
{
let block_number = parse_block_number(block_ref, range_position, provider).await.unwrap();
fetcher: &Fetcher<P>,
) -> bool {
let block_number = parse_block_number(block_ref, range_position, fetcher).await.unwrap();
block_number == expected
}

Expand Down
11 changes: 6 additions & 5 deletions crates/cli/src/parse/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,22 @@ use ethers::prelude::*;
use hex::FromHex;

use cryo_freeze::{
ColumnEncoding, Datatype, FileFormat, LogDecoder, MultiQuery, ParseError, RowFilter, Table,
ColumnEncoding, Datatype, Fetcher, FileFormat, LogDecoder, MultiQuery, ParseError, RowFilter,
Table,
};

use super::{blocks, file_output, transactions};
use crate::args::Args;
use cryo_freeze::U256Type;

pub(crate) async fn parse_query(
pub(crate) async fn parse_query<P: JsonRpcClient>(
args: &Args,
provider: Arc<Provider<Http>>,
fetcher: Arc<Fetcher<P>>,
) -> Result<MultiQuery, ParseError> {
let chunks = match (&args.blocks, &args.txs) {
(Some(_), None) => blocks::parse_blocks(args, provider).await?,
(Some(_), None) => blocks::parse_blocks(args, fetcher).await?,
(None, Some(txs)) => transactions::parse_transactions(txs)?,
(None, None) => blocks::get_default_block_chunks(args, provider).await?,
(None, None) => blocks::get_default_block_chunks(args, fetcher).await?,
(Some(_), Some(_)) => {
return Err(ParseError::ParseError("specify only one of --blocks or --txs".to_string()))
}
Expand Down
11 changes: 5 additions & 6 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use governor::{Quota, RateLimiter};
use polars::prelude::*;
use std::num::NonZeroU32;

use cryo_freeze::{ParseError, Source};
use cryo_freeze::{Fetcher, ParseError, Source};

use crate::args::Args;

Expand All @@ -20,7 +20,7 @@ pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
Some(rate_limit) => match NonZeroU32::new(rate_limit) {
Some(value) => {
let quota = Quota::per_second(value);
Some(Arc::new(RateLimiter::direct(quota)))
Some(RateLimiter::direct(quota))
}
_ => None,
},
Expand All @@ -32,13 +32,12 @@ pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
let max_concurrent_chunks = args.max_concurrent_chunks.unwrap_or(3);

let semaphore = tokio::sync::Semaphore::new(max_concurrent_requests as usize);
let semaphore = Some(Arc::new(semaphore));
let semaphore = Some(semaphore);

let fetcher = Fetcher { provider, semaphore, rate_limiter };
let output = Source {
provider: Arc::new(provider),
fetcher: Arc::new(fetcher),
chain_id,
semaphore,
rate_limiter,
inner_request_size: args.inner_request_size,
max_concurrent_chunks,
};
Expand Down
Loading

0 comments on commit 7ca0073

Please sign in to comment.