Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize window cache building for ibd #576

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
6ed8c71
show changes.
D-Stacks Sep 3, 2024
baa761b
optimize window caches for ibd.
D-Stacks Sep 4, 2024
6930ef9
do lints and checks etc..
D-Stacks Sep 4, 2024
25eb1e2
bench and compare.
D-Stacks Sep 9, 2024
fa0e110
clean-up
D-Stacks Sep 9, 2024
1d5fc7b
rework lock time check a bit.
D-Stacks Sep 9, 2024
6055c96
Merge branch 'dev' into optimize-window-caches-for-ibd
D-Stacks Sep 9, 2024
641de9f
// bool: todo!(),
D-Stacks Sep 9, 2024
04003d1
Merge branch 'optimize-window-caches-for-ibd' of https://github.com/D…
D-Stacks Sep 9, 2024
702daf6
fmt
D-Stacks Sep 9, 2024
fecd2c6
Merge branch 'dev' into optimize-window-caches-for-ibd
D-Stacks Sep 9, 2024
0d274d4
Merge branch 'dev' into optimize-window-caches-for-ibd
D-Stacks Sep 11, 2024
7fdda0e
Merge branch 'master' into optimize-window-caches-for-ibd
D-Stacks Sep 11, 2024
5a29a55
Merge branch 'master' into optimize-window-caches-for-ibd
D-Stacks Sep 12, 2024
d966502
Merge branch 'master' into optimize-window-caches-for-ibd
D-Stacks Sep 15, 2024
dada3cf
Merge branch 'master' into optimize-window-cache-building-for-ibd
D-Stacks Oct 3, 2024
38236de
address some reveiw points.
D-Stacks Oct 3, 2024
f439227
Merge branch 'master' into optimize-window-cache-building-for-ibd
D-Stacks Oct 7, 2024
cbc9e32
Merge branch 'master' into optimize-window-cache-building-for-ibd
D-Stacks Oct 7, 2024
6531de3
address reveiw comments.
D-Stacks Oct 7, 2024
e2c0a79
update comments.
D-Stacks Oct 7, 2024
65076c5
pass tests.
D-Stacks Oct 7, 2024
1530573
Merge branch 'master' into optimize-window-cache-building-for-ibd
D-Stacks Oct 8, 2024
dd520bf
fix blue work assumption, update error message.
D-Stacks Oct 10, 2024
c5664ad
Update window.rs
D-Stacks Oct 10, 2024
3598256
simplify a bit more.
D-Stacks Oct 10, 2024
8ce71fa
remove some unneeded things. rearrange access to cmpct gdd.
D-Stacks Oct 10, 2024
aecdef8
Merge branch 'master' into optimize-window-cache-building-for-ibd
D-Stacks Oct 15, 2024
408cef7
Merge branch 'master' into optimize-window-cache-building-for-ibd
D-Stacks Oct 22, 2024
08a2d31
fix merge master conflicts.
D-Stacks Nov 11, 2024
91b4c53
fix conflicts.
D-Stacks Nov 11, 2024
fc4f047
lints..
D-Stacks Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/core/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub mod perf {

const BASELINE_HEADER_DATA_CACHE_SIZE: usize = 10_000;
const BASELINE_BLOCK_DATA_CACHE_SIZE: usize = 200;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2000;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2_000;
const BASELINE_UTXOSET_CACHE_SIZE: usize = 10_000;

#[derive(Clone, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/errors/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ pub enum RuleError {
#[error("expected header blue work {0} but got {1}")]
UnexpectedHeaderBlueWork(BlueWorkType, BlueWorkType),

#[error("block difficulty of {0} is not the expected value of {1}")]
UnexpectedDifficulty(u32, u32),
#[error("block difficulty of {0} has value of {1} and is not the expected value of {2}")]
UnexpectedDifficulty(Hash, u32, u32),

#[error("block timestamp of {0} is not after expected {1}")]
TimeTooOld(u64, u64),
Expand Down
1 change: 1 addition & 0 deletions consensus/src/model/stores/ghostdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl MemSizeEstimator for GhostdagData {
impl MemSizeEstimator for CompactGhostdagData {}

impl From<&GhostdagData> for CompactGhostdagData {
#[inline(always)]
fn from(value: &GhostdagData) -> Self {
Self { blue_score: value.blue_score, blue_work: value.blue_work, selected_parent: value.selected_parent }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use kaspa_consensus_core::block::Block;
use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_utils::option::OptionExtensions;
use once_cell::unsync::Lazy;
use std::sync::Arc;

impl BlockBodyProcessor {
Expand All @@ -18,13 +19,21 @@ impl BlockBodyProcessor {
}

fn check_block_transactions_in_context(self: &Arc<Self>, block: &Block) -> BlockProcessResult<()> {
let (pmt, _) = self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap())?;
// TODO: this is somewhat expensive during ibd, as it incurs cache misses.
let pmt_res =
Lazy::new(|| match self.window_manager.calc_past_median_time(&self.ghostdag_store.get_data(block.hash()).unwrap()) {
Ok((pmt, _)) => Ok(pmt),
Err(e) => Err(e),
});

for tx in block.transactions.iter() {
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, pmt) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
}
// quick check to avoid the expensive Lazy eval during ibd (in most cases).
if tx.lock_time != 0 {
if let Err(e) = self.transaction_validator.utxo_free_tx_validation(tx, block.header.daa_score, (*pmt_res).clone()?) {
return Err(RuleError::TxInContextFailed(tx.id(), e));
};
};
}

Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl HeaderProcessor {
ctx.mergeset_non_daa = Some(daa_window.mergeset_non_daa);

if header.bits != expected_bits {
return Err(RuleError::UnexpectedDifficulty(header.bits, expected_bits));
return Err(RuleError::UnexpectedDifficulty(header.hash, header.bits, expected_bits));
}

ctx.block_window_for_difficulty = Some(daa_window.window);
Expand Down
42 changes: 38 additions & 4 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use crate::{
stores::{
acceptance_data::{AcceptanceDataStoreReader, DbAcceptanceDataStore},
block_transactions::{BlockTransactionsStoreReader, DbBlockTransactionsStore},
block_window_cache::BlockWindowCacheStore,
daa::DbDaaStore,
depth::{DbDepthStore, DepthStoreReader},
ghostdag::{DbGhostdagStore, GhostdagData, GhostdagStoreReader},
ghostdag::{CompactGhostdagData, DbGhostdagStore, GhostdagData, GhostdagStoreReader},
headers::{DbHeadersStore, HeaderStoreReader},
past_pruning_points::DbPastPruningPointsStore,
pruning::{DbPruningStore, PruningStoreReader},
Expand Down Expand Up @@ -149,6 +150,10 @@ pub struct VirtualStateProcessor {
pub(super) parents_manager: DbParentsManager,
pub(super) depth_manager: DbBlockDepthManager,

// block window caches
pub(super) block_window_cache_for_difficulty: Arc<BlockWindowCacheStore>,
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Pruning lock
pruning_lock: SessionLock,

Expand Down Expand Up @@ -206,6 +211,9 @@ impl VirtualStateProcessor {
pruning_utxoset_stores: storage.pruning_utxoset_stores.clone(),
lkg_virtual_state: storage.lkg_virtual_state.clone(),

block_window_cache_for_difficulty: storage.block_window_cache_for_difficulty.clone(),
block_window_cache_for_past_median_time: storage.block_window_cache_for_past_median_time.clone(),

ghostdag_manager: services.ghostdag_manager.clone(),
reachability_service: services.reachability_service.clone(),
relations_service: services.relations_service.clone(),
Expand Down Expand Up @@ -303,11 +311,19 @@ impl VirtualStateProcessor {
.expect("all possible rule errors are unexpected here");

// Update the pruning processor about the virtual state change
let sink_ghostdag_data = self.ghostdag_store.get_compact_data(new_sink).unwrap();
let compact_sink_ghostdag_data = if prev_sink != new_sink {
// we need to check with full data here, since we may need to update the window caches
let sink_ghostdag_data = self.ghostdag_store.get_data(new_sink).unwrap();
// update window caches - for ibd performance. see method comment for more details.
self.maybe_commit_windows(new_sink, &sink_ghostdag_data);
CompactGhostdagData::from(sink_ghostdag_data.as_ref())
} else {
self.ghostdag_store.get_compact_data(new_sink).unwrap()
};
// Empty the channel before sending the new message. If pruning processor is busy, this step makes sure
// the internal channel does not grow with no need (since we only care about the most recent message)
let _consume = self.pruning_receiver.try_iter().count();
self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data }).unwrap();
self.pruning_sender.send(PruningProcessingMessage::Process { sink_ghostdag_data: compact_sink_ghostdag_data }).unwrap();

// Emit notifications
let accumulated_diff = Arc::new(accumulated_diff);
Expand All @@ -319,7 +335,7 @@ impl VirtualStateProcessor {
.notify(Notification::UtxosChanged(UtxosChangedNotification::new(accumulated_diff, virtual_parents)))
.expect("expecting an open unbounded channel");
self.notification_root
.notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(sink_ghostdag_data.blue_score)))
.notify(Notification::SinkBlueScoreChanged(SinkBlueScoreChangedNotification::new(compact_sink_ghostdag_data.blue_score)))
.expect("expecting an open unbounded channel");
self.notification_root
.notify(Notification::VirtualDaaScoreChanged(VirtualDaaScoreChangedNotification::new(new_virtual_state.daa_score)))
Expand Down Expand Up @@ -540,6 +556,24 @@ impl VirtualStateProcessor {
drop(selected_chain_write);
}

fn maybe_commit_windows(&self, new_sink: Hash, sink_ghostdag_data: &GhostdagData) {
// this is only important for ibd performance, as we incur expensive cache misses otherwise.
// this occurs because we cannot rely on header processing to pre-cache in this scenario.

// TODO: We could optimize this by only committing the windows if virtual processor where to have explicit knowledge of being in ibd.
// above may be possible with access to the `is_ibd_running` AtomicBool, or `is_nearly_synced()` method.

if !self.block_window_cache_for_difficulty.contains_key(&new_sink) {
self.block_window_cache_for_difficulty
.insert(new_sink, self.window_manager.block_daa_window(sink_ghostdag_data).unwrap().window);
};

if !self.block_window_cache_for_past_median_time.contains_key(&new_sink) {
self.block_window_cache_for_past_median_time
.insert(new_sink, self.window_manager.calc_past_median_time(sink_ghostdag_data).unwrap().1);
};
}

/// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation.
///
/// Guaranteed to be `>= self.max_block_parents`
Expand Down
157 changes: 96 additions & 61 deletions consensus/src/processes/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use kaspa_consensus_core::{
};
use kaspa_hashes::Hash;
use kaspa_math::Uint256;
use kaspa_utils::refs::Refs;
use kaspa_utils::{arc::ArcExtensions, refs::Refs};
use once_cell::unsync::Lazy;
use std::{cmp::Reverse, iter::once, ops::Deref, sync::Arc};

Expand Down Expand Up @@ -332,52 +332,30 @@ impl<T: GhostdagStoreReader, U: BlockWindowCacheReader, V: HeaderStoreReader, W:
WindowType::FullDifficultyWindow | WindowType::VaryingWindow(_) => None,
};

if let Some(cache) = cache {
if let Some(selected_parent_binary_heap) = cache.get(&ghostdag_data.selected_parent) {
// Only use the cached window if it originates from here
if let WindowOrigin::Sampled = selected_parent_binary_heap.origin() {
let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap();

let mut heap =
Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_binary_heap).clone()));
for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) {
match block {
SampledBlock::Sampled(block) => {
heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(hash) => {
mergeset_non_daa_inserter(hash);
}
}
}

return if let Ok(heap) = Lazy::into_value(heap) {
Ok(Arc::new(heap.binary_heap))
} else {
Ok(selected_parent_binary_heap.clone())
};
}
}
let selected_parent_blue_work = self.ghostdag_store.get_blue_work(ghostdag_data.selected_parent).unwrap();

//try to initialize the window from the cache directly
if let Some(res) = self.try_init_from_cache(
window_size,
sample_rate,
cache,
ghostdag_data,
selected_parent_blue_work,
&mut mergeset_non_daa_inserter,
) {
return Ok(res);
}

// else we populate the window with the passed ghostdag_data.
let mut window_heap = BoundedSizeBlockHeap::new(WindowOrigin::Sampled, window_size);
let parent_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap();

for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, parent_ghostdag.blue_work) {
match block {
SampledBlock::Sampled(block) => {
window_heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(hash) => {
mergeset_non_daa_inserter(hash);
}
}
}
self.push_mergeset(&mut window_heap, sample_rate, ghostdag_data, selected_parent_blue_work, mergeset_non_daa_inserter);
let mut current_ghostdag = self.ghostdag_store.get_data(ghostdag_data.selected_parent).unwrap();

let mut current_ghostdag = parent_ghostdag;
// Note: no need to check for cache here, as we already tried to initialize from the passed ghostdag's selected parent cache in `self.try_init_from_cache`

// Walk down the chain until we cross the window boundaries
// Walk down the chain until we cross the window boundaries.
loop {
// check if we may exit early.
if current_ghostdag.selected_parent.is_origin() {
// Reaching origin means there's no more data, so we expect the window to already be full, otherwise we err.
// This error can happen only during an IBD from pruning proof when processing the first headers in the pruning point's
Expand All @@ -387,50 +365,97 @@ impl<T: GhostdagStoreReader, U: BlockWindowCacheReader, V: HeaderStoreReader, W:
} else {
return Err(RuleError::InsufficientDaaWindowSize(window_heap.binary_heap.len()));
}
}

if current_ghostdag.selected_parent == self.genesis_hash {
} else if current_ghostdag.selected_parent == self.genesis_hash {
break;
}

let parent_ghostdag = self.ghostdag_store.get_data(current_ghostdag.selected_parent).unwrap();
let selected_parent_blue_work_too_low =
self.try_push_mergeset(&mut window_heap, sample_rate, &current_ghostdag, parent_ghostdag.blue_work);
// No need to further iterate since past of selected parent has even lower blue work
if selected_parent_blue_work_too_low {

// No need to further iterate since past of selected parent has only lower blue work
if !window_heap.can_push(current_ghostdag.selected_parent, parent_ghostdag.blue_work) {
break;
}

// push the current mergeset into the window
self.push_mergeset(&mut window_heap, sample_rate, &current_ghostdag, parent_ghostdag.blue_work, move |_| {});

// see if we can inherit and merge with the selected parent cache
if self.try_merge_with_selected_parent_cache(&mut window_heap, cache, &current_ghostdag.selected_parent) {
// if successful, we may break out of the loop, with the window already filled.
break;
};

// update the current ghostdag to the parent ghostdag, and continue the loop.
current_ghostdag = parent_ghostdag;
}

Ok(Arc::new(window_heap.binary_heap))
}

fn try_push_mergeset(
fn push_mergeset(
&self,
heap: &mut BoundedSizeBlockHeap,
sample_rate: u64,
ghostdag_data: &GhostdagData,
selected_parent_blue_work: BlueWorkType,
) -> bool {
// If the window is full and the selected parent is less than the minimum then we break
// because this means that there cannot be any more blocks in the past with higher blue work
if !heap.can_push(ghostdag_data.selected_parent, selected_parent_blue_work) {
return true;
}

mut mergeset_non_daa_inserter: impl FnMut(Hash),
) {
for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) {
match block {
SampledBlock::Sampled(block) => {
if !heap.try_push(block.hash, block.blue_work) {
break;
}
heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(_) => {}
SampledBlock::NonDaa(hash) => mergeset_non_daa_inserter(hash),
}
}
false
}

fn try_init_from_cache(
&self,
window_size: usize,
sample_rate: u64,
cache: Option<&Arc<U>>,
ghostdag_data: &GhostdagData,
selected_parent_blue_work: BlueWorkType,
mut mergeset_non_daa_inserter: impl FnMut(Hash),
) -> Option<Arc<BlockWindowHeap>> {
cache.and_then(|cache| {
cache.get(&ghostdag_data.selected_parent).map(|selected_parent_window| {
let mut heap = Lazy::new(|| BoundedSizeBlockHeap::from_binary_heap(window_size, (*selected_parent_window).clone()));
//Note: calling self.push_mergeset here voids the lazy evaluation optimization of the heap. so we don't do that.
for block in self.sampled_mergeset_iterator(sample_rate, ghostdag_data, selected_parent_blue_work) {
D-Stacks marked this conversation as resolved.
Show resolved Hide resolved
match block {
SampledBlock::Sampled(block) => {
heap.try_push(block.hash, block.blue_work);
}
SampledBlock::NonDaa(hash) => {
mergeset_non_daa_inserter(hash);
}
}
}

if let Ok(heap) = Lazy::into_value(heap) {
Arc::new(heap.binary_heap)
} else {
selected_parent_window.clone()
}
})
})
}

fn try_merge_with_selected_parent_cache(
&self,
heap: &mut BoundedSizeBlockHeap,
cache: Option<&Arc<U>>,
selected_parent: &Hash,
) -> bool {
cache
.and_then(|cache| {
cache.get(selected_parent).map(|selected_parent_window| {
heap.merge_ancestor_heap(&mut selected_parent_window.unwrap_or_clone());
})
})
.is_some()
}

fn sampled_mergeset_iterator<'a>(
Expand Down Expand Up @@ -686,4 +711,14 @@ impl BoundedSizeBlockHeap {
self.binary_heap.push(r_sortable_block);
true
}

// This method is intended to be used to merge the ancestor heap with the current heap.
fn merge_ancestor_heap(&mut self, ancestor_heap: &mut BlockWindowHeap) {
self.binary_heap.blocks.append(&mut ancestor_heap.blocks);
// below we saturate for cases where ancestor may be close to, the origin, or genesis.
// note: this is a no-op if overflow_amount is 0, i.e. because of the saturating sub, the sum of the two heaps is less or equal to the size bound.
for _ in 0..self.binary_heap.len().saturating_sub(self.size_bound) {
self.binary_heap.blocks.pop();
}
}
}