From ec5907b70ff7d868a05b685a1641d96dc4fa9d69 Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Wed, 6 Dec 2023 16:57:06 +0200 Subject: [PATCH] fix: Cursor not moving correctly after poll in `get_filter_changes` (#546) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ When polling filter changes, add 1 to actual from_block value ## Why ❔ Otherwise, last block that was included in poll will be included to the next one. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --------- Co-authored-by: Fedor Sakharov --- core/lib/dal/sqlx-data.json | 54 +++++++++---------- core/lib/dal/src/blocks_web3_dal.rs | 6 +-- core/lib/dal/src/events_web3_dal.rs | 13 ++--- core/lib/types/src/api/mod.rs | 2 +- .../src/api_server/web3/namespaces/eth.rs | 45 ++++++++++------ .../src/api_server/web3/namespaces/zks.rs | 2 +- .../zksync_core/src/api_server/web3/state.rs | 3 +- .../src/api_server/web3/tests/mod.rs | 24 ++++----- 8 files changed, 76 insertions(+), 73 deletions(-) diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index 9084adb61cda..2d1773482ea1 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -1928,6 +1928,33 @@ }, "query": "SELECT * from prover_jobs where id=$1" }, + "2044947d6d29f29cda508b2160c39f74a8bfd524afa2ffc20a98ae039bc86ed7": { + "describe": { + "columns": [ + { + "name": "number", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "hash", + "ordinal": 1, + "type_info": "Bytea" + } + ], + "nullable": [ + false, + false + ], + "parameters": { + "Left": [ + "Int8", + "Int8" + ] + } + }, + "query": "SELECT number, hash FROM miniblocks WHERE number >= $1 ORDER BY number ASC LIMIT $2" + }, "20b22fd457417e9a72f5941887448f9a11b97b449db4759da0b9d368ce93996b": { "describe": { "columns": [ @@ -9492,33 +9519,6 @@ }, "query": "\n SELECT id, circuit_input_blob_url FROM prover_jobs\n WHERE status='successful'\n AND circuit_input_blob_url is NOT NULL\n AND updated_at < NOW() - INTERVAL '30 days'\n LIMIT $1;\n " }, - "b479b7d3334f8d4566c294a44e2adb282fbc66a87be5c248c65211c2a8a07db0": { - "describe": { - "columns": [ - { - "name": "number", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "hash", - "ordinal": 1, - "type_info": "Bytea" - } - ], - "nullable": [ - false, - false - ], - "parameters": { - "Left": [ - "Int8", - "Int8" - ] - } - }, - "query": "SELECT number, hash FROM miniblocks WHERE number > $1 ORDER BY number ASC LIMIT $2" - }, "b4a3c902646725188f7c79ebac992cdce5896fc6fcc9f485c0cba9d90c4c982c": { "describe": { "columns": [ diff --git a/core/lib/dal/src/blocks_web3_dal.rs b/core/lib/dal/src/blocks_web3_dal.rs index e42a645966ff..0c2a8b4e1885 100644 --- a/core/lib/dal/src/blocks_web3_dal.rs +++ b/core/lib/dal/src/blocks_web3_dal.rs @@ -161,15 +161,15 @@ impl BlocksWeb3Dal<'_, '_> { })) } - /// Returns hashes of blocks with numbers greater than `from_block` and the number of the last block. - pub async fn get_block_hashes_after( + /// Returns hashes of blocks with numbers starting from `from_block` and the number of the last block. + pub async fn get_block_hashes_since( &mut self, from_block: MiniblockNumber, limit: usize, ) -> sqlx::Result<(Vec, Option)> { let rows = sqlx::query!( "SELECT number, hash FROM miniblocks \ - WHERE number > $1 \ + WHERE number >= $1 \ ORDER BY number ASC \ LIMIT $2", from_block.0 as i64, diff --git a/core/lib/dal/src/events_web3_dal.rs b/core/lib/dal/src/events_web3_dal.rs index 82a65c18444b..7cdf2dba6467 100644 --- a/core/lib/dal/src/events_web3_dal.rs +++ b/core/lib/dal/src/events_web3_dal.rs @@ -6,9 +6,7 @@ use zksync_types::{ }; use crate::{ - instrument::InstrumentExt, - models::{storage_block::web3_block_number_to_sql, storage_event::StorageWeb3Log}, - SqlxError, StorageProcessor, + instrument::InstrumentExt, models::storage_event::StorageWeb3Log, SqlxError, StorageProcessor, }; #[derive(Debug)] @@ -119,10 +117,8 @@ impl EventsWeb3Dal<'_, '_> { let mut where_sql = format!("(miniblock_number >= {})", filter.from_block.0 as i64); - if let Some(to_block) = filter.to_block { - let block_sql = web3_block_number_to_sql(to_block); - where_sql += &format!(" AND (miniblock_number <= {})", block_sql); - } + where_sql += &format!(" AND (miniblock_number <= {})", filter.to_block.0 as i64); + if !filter.addresses.is_empty() { where_sql += &format!(" AND (address = ANY(${}))", arg_index); arg_index += 1; @@ -172,7 +168,6 @@ impl EventsWeb3Dal<'_, '_> { #[cfg(test)] mod tests { - use zksync_types::api::BlockNumber; use zksync_types::{Address, H256}; use super::*; @@ -185,7 +180,7 @@ mod tests { let events_web3_dal = EventsWeb3Dal { storage }; let filter = GetLogsFilter { from_block: MiniblockNumber(100), - to_block: Some(BlockNumber::Number(200.into())), + to_block: MiniblockNumber(200), addresses: vec![Address::from_low_u64_be(123)], topics: vec![(0, vec![H256::from_low_u64_be(456)])], }; diff --git a/core/lib/types/src/api/mod.rs b/core/lib/types/src/api/mod.rs index 24ac74ab335e..1ad54ce6d1ab 100644 --- a/core/lib/types/src/api/mod.rs +++ b/core/lib/types/src/api/mod.rs @@ -548,7 +548,7 @@ pub struct TransactionDetails { #[derive(Debug, Clone)] pub struct GetLogsFilter { pub from_block: MiniblockNumber, - pub to_block: Option, + pub to_block: MiniblockNumber, pub addresses: Vec
, pub topics: Vec<(u32, Vec)>, } diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index 4cabb8e15da3..0aa9255c3dbf 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -564,7 +564,7 @@ impl EthNamespace { .installed_filters .lock() .await - .add(TypedFilter::Blocks(last_block_number)); + .add(TypedFilter::Blocks(last_block_number + 1)); method_latency.observe(); Ok(idx) } @@ -773,14 +773,19 @@ impl EthNamespace { .map_err(|err| internal_error(METHOD_NAME, err))?; let (block_hashes, last_block_number) = conn .blocks_web3_dal() - .get_block_hashes_after(*from_block, self.state.api_config.req_entities_limit) + .get_block_hashes_since(*from_block, self.state.api_config.req_entities_limit) .await .map_err(|err| internal_error(METHOD_NAME, err))?; - *from_block = last_block_number.unwrap_or(*from_block); + + *from_block = match last_block_number { + Some(last_block_number) => last_block_number + 1, + None => *from_block, + }; + FilterChanges::Hashes(block_hashes) } - TypedFilter::PendingTransactions(from_timestamp) => { + TypedFilter::PendingTransactions(from_timestamp_excluded) => { let mut conn = self .state .connection_pool @@ -790,12 +795,14 @@ impl EthNamespace { let (tx_hashes, last_timestamp) = conn .transactions_web3_dal() .get_pending_txs_hashes_after( - *from_timestamp, + *from_timestamp_excluded, Some(self.state.api_config.req_entities_limit), ) .await .map_err(|err| internal_error(METHOD_NAME, err))?; - *from_timestamp = last_timestamp.unwrap_or(*from_timestamp); + + *from_timestamp_excluded = last_timestamp.unwrap_or(*from_timestamp_excluded); + FilterChanges::Hashes(tx_hashes) } @@ -816,16 +823,26 @@ impl EthNamespace { } else { vec![] }; + + let mut to_block = self + .state + .resolve_filter_block_number(filter.to_block) + .await?; + + if matches!(filter.to_block, Some(BlockNumber::Number(_))) { + to_block = to_block.min( + self.state + .resolve_filter_block_number(Some(BlockNumber::Latest)) + .await?, + ); + } + let get_logs_filter = GetLogsFilter { from_block: *from_block, - to_block: filter.to_block, + to_block, addresses, topics, }; - let to_block = self - .state - .resolve_filter_block_number(filter.to_block) - .await?; let mut storage = self .state @@ -859,11 +876,7 @@ impl EthNamespace { .get_logs(get_logs_filter, i32::MAX as usize) .await .map_err(|err| internal_error(METHOD_NAME, err))?; - *from_block = logs - .last() - .map(|log| MiniblockNumber(log.block_number.unwrap().as_u32())) - .unwrap_or(*from_block); - // FIXME: why is `from_block` not updated? + *from_block = to_block + 1; FilterChanges::Logs(logs) } }; diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs index 7f38c6afc525..9e3a90dde043 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs @@ -283,7 +283,7 @@ impl ZksNamespace { .get_logs( GetLogsFilter { from_block: first_miniblock_of_l1_batch, - to_block: Some(block_number.0.into()), + to_block: block_number, addresses: vec![L1_MESSENGER_ADDRESS], topics: vec![(2, vec![address_to_h256(&sender)]), (3, vec![msg])], }, diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index 0463d4823209..ea52b2ae61cc 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -505,9 +505,10 @@ impl RpcState { .enumerate() .filter_map(|(idx, topics)| topics.map(|topics| (idx as u32 + 1, topics.0))) .collect(); + let get_logs_filter = GetLogsFilter { from_block, - to_block: filter.to_block, + to_block, addresses, topics, }; diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index 12bb6481213d..1bb14df52fa2 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -386,10 +386,10 @@ impl HttpTest for LogFilterChanges { assert_logs_match(&topics_logs, &[events[1], events[3]]); let new_all_logs = client.get_filter_changes(all_logs_filter_id).await?; - let FilterChanges::Logs(new_all_logs) = new_all_logs else { + let FilterChanges::Hashes(new_all_logs) = new_all_logs else { panic!("Unexpected getFilterChanges output: {:?}", new_all_logs); }; - assert_eq!(new_all_logs, all_logs); // FIXME(#546): update test after behavior is fixed + assert!(new_all_logs.is_empty()); Ok(()) } } @@ -458,11 +458,10 @@ impl HttpTest for LogFilterChangesWithBlockBoundaries { }; assert_logs_match(&lower_bound_logs, &new_events); - // FIXME(#546): update test after behavior is fixed let new_upper_bound_logs = client.get_filter_changes(upper_bound_filter_id).await?; - assert_eq!(new_upper_bound_logs, FilterChanges::Logs(upper_bound_logs)); + assert_matches!(new_upper_bound_logs, FilterChanges::Hashes(hashes) if hashes.is_empty()); let new_bounded_logs = client.get_filter_changes(bounded_filter_id).await?; - assert_eq!(new_bounded_logs, FilterChanges::Logs(bounded_logs)); + assert_matches!(new_bounded_logs, FilterChanges::Hashes(hashes) if hashes.is_empty()); // Add miniblock #3. It should not be picked up by the bounded and upper bound filters, // and should be picked up by the lower bound filter. @@ -472,27 +471,22 @@ impl HttpTest for LogFilterChangesWithBlockBoundaries { let new_events: Vec<_> = new_events.iter().collect(); let bounded_logs = client.get_filter_changes(bounded_filter_id).await?; - let FilterChanges::Logs(bounded_logs) = bounded_logs else { + let FilterChanges::Hashes(bounded_logs) = bounded_logs else { panic!("Unexpected getFilterChanges output: {:?}", bounded_logs); }; - assert!(bounded_logs - .iter() - .all(|log| log.block_number.unwrap() < 3.into())); + assert!(bounded_logs.is_empty()); let upper_bound_logs = client.get_filter_changes(upper_bound_filter_id).await?; - let FilterChanges::Logs(upper_bound_logs) = upper_bound_logs else { + let FilterChanges::Hashes(upper_bound_logs) = upper_bound_logs else { panic!("Unexpected getFilterChanges output: {:?}", upper_bound_logs); }; - assert!(upper_bound_logs - .iter() - .all(|log| log.block_number.unwrap() < 3.into())); + assert!(upper_bound_logs.is_empty()); let lower_bound_logs = client.get_filter_changes(lower_bound_filter_id).await?; let FilterChanges::Logs(lower_bound_logs) = lower_bound_logs else { panic!("Unexpected getFilterChanges output: {:?}", lower_bound_logs); }; - let start_idx = lower_bound_logs.len() - 4; - assert_logs_match(&lower_bound_logs[start_idx..], &new_events); + assert_logs_match(&lower_bound_logs, &new_events); Ok(()) } }