diff --git a/consensus/core/src/config/constants.rs b/consensus/core/src/config/constants.rs index c4635083b..146e30a17 100644 --- a/consensus/core/src/config/constants.rs +++ b/consensus/core/src/config/constants.rs @@ -121,7 +121,7 @@ pub mod perf { const BASELINE_HEADER_DATA_CACHE_SIZE: usize = 10_000; const BASELINE_BLOCK_DATA_CACHE_SIZE: usize = 200; - const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2000; + const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2_000; const BASELINE_UTXOSET_CACHE_SIZE: usize = 10_000; #[derive(Clone, Debug)] diff --git a/consensus/core/src/errors/block.rs b/consensus/core/src/errors/block.rs index f5c235476..be7613435 100644 --- a/consensus/core/src/errors/block.rs +++ b/consensus/core/src/errors/block.rs @@ -64,8 +64,8 @@ pub enum RuleError { #[error("expected header blue work {0} but got {1}")] UnexpectedHeaderBlueWork(BlueWorkType, BlueWorkType), - #[error("block difficulty of {0} is not the expected value of {1}")] - UnexpectedDifficulty(u32, u32), + #[error("block difficulty of {0} has value of {1} and is not the expected value of {2}")] + UnexpectedDifficulty(Hash, u32, u32), #[error("block timestamp of {0} is not after expected {1}")] TimeTooOld(u64, u64), diff --git a/consensus/src/model/stores/ghostdag.rs b/consensus/src/model/stores/ghostdag.rs index fd2600a1c..4ed02e4ce 100644 --- a/consensus/src/model/stores/ghostdag.rs +++ b/consensus/src/model/stores/ghostdag.rs @@ -48,6 +48,7 @@ impl MemSizeEstimator for GhostdagData { impl MemSizeEstimator for CompactGhostdagData {} impl From<&GhostdagData> for CompactGhostdagData { + #[inline(always)] fn from(value: &GhostdagData) -> Self { Self { blue_score: value.blue_score, blue_work: value.blue_work, selected_parent: value.selected_parent } } diff --git a/consensus/src/pipeline/body_processor/body_validation_in_context.rs b/consensus/src/pipeline/body_processor/body_validation_in_context.rs index b03643df8..cdf6dc28c 100644 --- a/consensus/src/pipeline/body_processor/body_validation_in_context.rs +++ b/consensus/src/pipeline/body_processor/body_validation_in_context.rs @@ -8,6 +8,7 @@ use kaspa_consensus_core::block::Block; use kaspa_database::prelude::StoreResultExtensions; use kaspa_hashes::Hash; use kaspa_utils::option::OptionExtensions; +use once_cell::unsync::Lazy; use std::sync::Arc; impl BlockBodyProcessor { @@ -18,13 +19,21 @@ impl BlockBodyProcessor { } fn check_block_transactions_in_context(self: &Arc, block: &Block) -> BlockProcessResult<()> { - let (pmt, _) = self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap())?; + // TODO: this is somewhat expensive during ibd, as it incurs cache misses. + let pmt_res = + Lazy::new(|| match self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap()) { + Ok((pmt, _)) => Ok(pmt), + Err(e) => Err(e), + }); + for tx in block.transactions.iter() { - if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, pmt) { - return Err(RuleError::TxInContextFailed(tx.id(), e)); - } + // quick check to avoid the expensive Lazy eval during ibd (in most cases). + if tx.lock_time != 0 { + if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, (*pmt_res).clone()?) { + return Err(RuleError::TxInContextFailed(tx.id(), e)); + }; + }; } - Ok(()) } diff --git a/consensus/src/pipeline/header_processor/pre_pow_validation.rs b/consensus/src/pipeline/header_processor/pre_pow_validation.rs index a4dfb8b1e..7764e1c15 100644 --- a/consensus/src/pipeline/header_processor/pre_pow_validation.rs +++ b/consensus/src/pipeline/header_processor/pre_pow_validation.rs @@ -35,7 +35,7 @@ impl HeaderProcessor { ctx.mergeset_non_daa = Some(daa_window.mergeset_non_daa); if header.bits != expected_bits { - return Err(RuleError::UnexpectedDifficulty(header.bits, expected_bits)); + return Err(RuleError::UnexpectedDifficulty(header.hash, header.bits, expected_bits)); } ctx.block_window_for_difficulty = Some(daa_window.window); diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index c654fef43..bfdeff3e2 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -16,9 +16,10 @@ use crate::{ stores::{ acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore}, block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore}, + block_window_cache::BlockWindowCacheStore, daa::DbDaaStore, depth::{DbDepthStore, DepthStoreReader}, - ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader}, + ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagData, GhostdagStoreReader}, headers::{DbHeadersStore, HeaderStoreReader}, past_pruning_points::DbPastPruningPointsStore, pruning::{DbPruningStore, PruningStoreReader}, @@ -149,6 +150,10 @@ pub struct VirtualStateProcessor { pub(super) parents_manager: DbParentsManager, pub(super) depth_manager: DbBlockDepthManager, + // block window caches + pub(super) block_window_cache_for_difficulty: Arc, + pub(super) block_window_cache_for_past_median_time: Arc, + // Pruning lock pruning_lock: SessionLock, @@ -206,6 +211,9 @@ impl VirtualStateProcessor { pruning_utxoset_stores: storage.pruning_utxoset_stores.clone(), lkg_virtual_state: storage.lkg_virtual_state.clone(), + block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(), + block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(), + ghostdag_manager: services.ghostdag_manager.clone(), reachability_service: services.reachability_service.clone(), relations_service: services.relations_service.clone(), @@ -303,11 +311,19 @@ impl VirtualStateProcessor { .expect("all possible rule errors are unexpected here"); // Update the pruning processor about the virtual state change - let sink_ghostdag_data = self.ghostdag_store.get_compact_data(new_sink).unwrap(); + let compact_sink_ghostdag_data = if prev_sink != new_sink { + // we need to check with full data here, since we may need to update the window caches + let sink_ghostdag_data = self.ghostdag_store.get_data(new_sink).unwrap(); + // update window caches - for ibd performance. see method comment for more details. + self.maybe_commit_windows(new_sink, &sink_ghostdag_data); + CompactGhostdagData::from(sink_ghostdag_data.as_ref()) + } else { + self.ghostdag_store.get_compact_data(new_sink).unwrap() + }; // Empty the channel before sending the new message. If pruning processor is busy, this step makes sure // the internal channel does not grow with no need (since we only care about the most recent message) let _consume = self.pruning_receiver.try_iter().count(); - self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data }).unwrap(); + self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data: compact_sink_ghostdag_data }).unwrap(); // Emit notifications let accumulated_diff = Arc::new(accumulated_diff); @@ -319,7 +335,7 @@ impl VirtualStateProcessor { .notify(Notification::UtxosChanged(UtxosChangedNotification::new(accumulated_diff, virtual_parents))) .expect("expecting an open unbounded channel"); self.notification_root - .notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(sink_ghostdag_data.blue_score))) + .notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(compact_sink_ghostdag_data.blue_score))) .expect("expecting an open unbounded channel"); self.notification_root .notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score))) @@ -540,6 +556,24 @@ impl VirtualStateProcessor { drop(selected_chain_write); } + fn maybe_commit_windows(&self, new_sink: Hash, sink_ghostdag_data: &GhostdagData) { + // this is only important for ibd performance, as we incur expensive cache misses otherwise. + // this occurs because we cannot rely on header processing to pre-cache in this scenario. + + // TODO: We could optimize this by only committing the windows if virtual processor where to have explicit knowledge of being in ibd. + // above may be possible with access to the `is_ibd_running` AtomicBool, or `is_nearly_synced()` method. + + if !self.block_window_cache_for_difficulty.contains_key(&new_sink) { + self.block_window_cache_for_difficulty + .insert(new_sink, self.window_manager.block_daa_window(sink_ghostdag_data).unwrap().window); + }; + + if !self.block_window_cache_for_past_median_time.contains_key(&new_sink) { + self.block_window_cache_for_past_median_time + .insert(new_sink, self.window_manager.calc_past_median_time(sink_ghostdag_data).unwrap().1); + }; + } + /// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation. /// /// Guaranteed to be `>= self.max_block_parents` diff --git a/consensus/src/processes/window.rs b/consensus/src/processes/window.rs index ca0f71cf2..6c53ecc6a 100644 --- a/consensus/src/processes/window.rs +++ b/consensus/src/processes/window.rs @@ -15,7 +15,7 @@ use kaspa_consensus_core::{ }; use kaspa_hashes::Hash; use kaspa_math::Uint256; -use kaspa_utils::refs::Refs; +use kaspa_utils::{arc::ArcExtensions, refs::Refs}; use once_cell::unsync::Lazy; use std::{cmp::Reverse, iter::once, ops::Deref, sync::Arc}; @@ -332,52 +332,30 @@ impl None, }; - if let Some(cache) = cache { - if let Some(selected_parent_binary_heap) = cache.get(&ghostdag_data.selected_parent) { - // Only use the cached window if it originates from here - if let WindowOrigin::Sampled = selected_parent_binary_heap.origin() { - let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap(); - - let mut heap = - Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_binary_heap).clone())); - for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) { - match block { - SampledBlock::Sampled(block) => { - heap.try_push(block.hash, block.blue_work); - } - SampledBlock::NonDaa(hash) => { - mergeset_non_daa_inserter(hash); - } - } - } - - return if let Ok(heap) = Lazy::into_value(heap) { - Ok(Arc::new(heap.binary_heap)) - } else { - Ok(selected_parent_binary_heap.clone()) - }; - } - } + let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap(); + + //try to initialize the window from the cache directly + if let Some(res) = self.try_init_from_cache( + window_size, + sample_rate, + cache, + ghostdag_data, + selected_parent_blue_work, + &mut mergeset_non_daa_inserter, + ) { + return Ok(res); } + // else we populate the window with the passed ghostdag_data. let mut window_heap = BoundedSizeBlockHeap::new(WindowOrigin::Sampled, window_size); - let parent_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap(); - - for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, parent_ghostdag.blue_work) { - match block { - SampledBlock::Sampled(block) => { - window_heap.try_push(block.hash, block.blue_work); - } - SampledBlock::NonDaa(hash) => { - mergeset_non_daa_inserter(hash); - } - } - } + self.push_mergeset(&mut window_heap, sample_rate, ghostdag_data, selected_parent_blue_work, mergeset_non_daa_inserter); + let mut current_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap(); - let mut current_ghostdag = parent_ghostdag; + // Note: no need to check for cache here, as we already tried to initialize from the passed ghostdag's selected parent cache in `self.try_init_from_cache` - // Walk down the chain until we cross the window boundaries + // Walk down the chain until we cross the window boundaries. loop { + // check if we may exit early. if current_ghostdag.selected_parent.is_origin() { // Reaching origin means there's no more data, so we expect the window to already be full, otherwise we err. // This error can happen only during an IBD from pruning proof when processing the first headers in the pruning point's @@ -387,50 +365,97 @@ impl bool { - // If the window is full and the selected parent is less than the minimum then we break - // because this means that there cannot be any more blocks in the past with higher blue work - if !heap.can_push(ghostdag_data.selected_parent, selected_parent_blue_work) { - return true; - } - + mut mergeset_non_daa_inserter: impl FnMut(Hash), + ) { for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) { match block { SampledBlock::Sampled(block) => { - if !heap.try_push(block.hash, block.blue_work) { - break; - } + heap.try_push(block.hash, block.blue_work); } - SampledBlock::NonDaa(_) => {} + SampledBlock::NonDaa(hash) => mergeset_non_daa_inserter(hash), } } - false + } + + fn try_init_from_cache( + &self, + window_size: usize, + sample_rate: u64, + cache: Option<&Arc>, + ghostdag_data: &GhostdagData, + selected_parent_blue_work: BlueWorkType, + mut mergeset_non_daa_inserter: impl FnMut(Hash), + ) -> Option> { + cache.and_then(|cache| { + cache.get(&ghostdag_data.selected_parent).map(|selected_parent_window| { + let mut heap = Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_window).clone())); + //Note: calling self.push_mergeset here voids the lazy evaluation optimization of the heap. so we don't do that. + for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) { + match block { + SampledBlock::Sampled(block) => { + heap.try_push(block.hash, block.blue_work); + } + SampledBlock::NonDaa(hash) => { + mergeset_non_daa_inserter(hash); + } + } + } + + if let Ok(heap) = Lazy::into_value(heap) { + Arc::new(heap.binary_heap) + } else { + selected_parent_window.clone() + } + }) + }) + } + + fn try_merge_with_selected_parent_cache( + &self, + heap: &mut BoundedSizeBlockHeap, + cache: Option<&Arc>, + selected_parent: &Hash, + ) -> bool { + cache + .and_then(|cache| { + cache.get(selected_parent).map(|selected_parent_window| { + heap.merge_ancestor_heap(&mut selected_parent_window.unwrap_or_clone()); + }) + }) + .is_some() } fn sampled_mergeset_iterator<'a>( @@ -686,4 +711,14 @@ impl BoundedSizeBlockHeap { self.binary_heap.push(r_sortable_block); true } + + // This method is intended to be used to merge the ancestor heap with the current heap. + fn merge_ancestor_heap(&mut self, ancestor_heap: &mut BlockWindowHeap) { + self.binary_heap.blocks.append(&mut ancestor_heap.blocks); + // below we saturate for cases where ancestor may be close to, the origin, or genesis. + // note: this is a no-op if overflow_amount is 0, i.e. because of the saturating sub, the sum of the two heaps is less or equal to the size bound. + for _ in 0..self.binary_heap.len().saturating_sub(self.size_bound) { + self.binary_heap.blocks.pop(); + } + } }