diff --git a/node/src/actors/session/handlers.rs b/node/src/actors/session/handlers.rs index 6deac268e..18d9f0578 100644 --- a/node/src/actors/session/handlers.rs +++ b/node/src/actors/session/handlers.rs @@ -11,10 +11,10 @@ use futures::future::Either; use witnet_data_structures::{ builders::from_address, chain::{ - Block, CheckpointBeacon, Epoch, Hashable, InventoryEntry, InventoryItem, SuperBlock, - SuperBlockVote, + Block, CheckpointBeacon, Epoch, InventoryEntry, InventoryItem, SuperBlock, SuperBlockVote, }, - proto::versioning::Versioned, + get_protocol_version, + proto::versioning::{Versioned, VersionedHashable}, transaction::Transaction, types::{ Address, Command, InventoryAnnouncement, InventoryRequest, LastBeacon, @@ -674,43 +674,28 @@ fn peer_discovery_peers( fn inventory_process_block(session: &mut Session, _ctx: &mut Context, block: Block) { // Get ChainManager address let chain_manager_addr = ChainManager::from_registry(); - let block_hash = block.hash(); - - if session.requested_block_hashes.contains(&block_hash) { - // Add block to requested_blocks - session.requested_blocks.insert(block_hash, block); - - if session.requested_blocks.len() == session.requested_block_hashes.len() { - let mut blocks_vector = Vec::with_capacity(session.requested_blocks.len()); - // Iterate over requested block hashes ordered by epoch - // TODO: We assume that the received InventoryAnnouncement message returns the list of - // block hashes ordered by epoch. - // If that is not the case, we can sort blocks_vector by block.block_header.checkpoint - for hash in session.requested_block_hashes.drain(..) { - if let Some(block) = session.requested_blocks.remove(&hash) { - blocks_vector.push(block); - } else { - // Assuming that we always clear requested_blocks after mutating - // requested_block_hashes, this branch should be unreachable. - // But if it happens, immediately exit the for loop and send an empty AddBlocks - // message to ChainManager. - log::warn!("Unexpected missing block: {}", hash); - break; - } - } + let block_hash = block.versioned_hash(get_protocol_version(Some( + block.block_header.beacon.checkpoint, + ))); + if !session.requested_block_hashes.is_empty() + && *session.requested_block_hashes.front().unwrap() == block_hash + { + session.requested_blocks.push(block); + session.requested_block_hashes.pop_front(); + } else { + if !session.requested_blocks.is_empty() { // Send a message to the ChainManager to try to add a new block chain_manager_addr.do_send(AddBlocks { - blocks: blocks_vector, + blocks: session.requested_blocks.clone(), sender: Some(session.remote_addr), }); // Clear requested block structures session.blocks_timestamp = 0; session.requested_blocks.clear(); - // requested_block_hashes is cleared by using drain(..) above + session.requested_block_hashes.clear(); } - } else { // If this is not a requested block, assume it is a candidate // Send a message to the ChainManager to try to add a new candidate chain_manager_addr.do_send(AddCandidates { diff --git a/node/src/actors/session/mod.rs b/node/src/actors/session/mod.rs index 8cf32b819..9ae8b78ec 100644 --- a/node/src/actors/session/mod.rs +++ b/node/src/actors/session/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use std::{collections::VecDeque, net::SocketAddr, sync::Arc}; use actix::{io::FramedWrite, SystemService}; @@ -79,10 +79,10 @@ pub struct Session { last_beacon: LastBeacon, /// Requested block hashes vector - requested_block_hashes: Vec, + requested_block_hashes: VecDeque, - /// HashMap with requested blocks - requested_blocks: HashMap, + /// Vec with requested blocks + requested_blocks: Vec, /// Timestamp for requested blocks blocks_timestamp: i64, @@ -132,8 +132,8 @@ impl Session { magic_number, current_epoch, last_beacon, - requested_block_hashes: vec![], - requested_blocks: HashMap::new(), + requested_block_hashes: VecDeque::new(), + requested_blocks: vec![], blocks_timestamp: 0, config, expected_peers_msg: 0,