Skip to content

Commit

Permalink
Parallelize MuHash calculations (#575)
Browse files Browse the repository at this point in the history
* Parallelize MuHash calculations

MuHash calculations are additive and can be done in chunks then later combined

* Reimplement validate tx with muhash as a separate fn

* Use smallvec for muhash parallel

Co-authored-by: Michael Sutton <[email protected]>

* Add independent rayon order test

* Filter some data

* Use tuple_windows for test iter

---------

Co-authored-by: Michael Sutton <[email protected]>
  • Loading branch information
coderofstuff and michaelsutton authored Oct 12, 2024
1 parent 1274e9c commit c59a0d1
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 5 deletions.
19 changes: 16 additions & 3 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
use kaspa_txscript::caches::TxScriptCacheCounters;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

use std::{
cmp::Reverse,
Expand Down Expand Up @@ -771,9 +772,21 @@ impl ConsensusApi for Consensus {
fn append_imported_pruning_point_utxos(&self, utxoset_chunk: &[(TransactionOutpoint, UtxoEntry)], current_multiset: &mut MuHash) {
let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap();
for (outpoint, entry) in utxoset_chunk {
current_multiset.add_utxo(outpoint, entry);
}

// Parallelize processing
let inner_multiset = utxoset_chunk
.par_iter()
.map(|(outpoint, entry)| {
let mut inner_multiset = MuHash::new();
inner_multiset.add_utxo(outpoint, entry);
inner_multiset
})
.reduce(MuHash::new, |mut a, b| {
a.combine(&b);
a
});

current_multiset.combine(&inner_multiset);
}

fn import_pruning_point_utxo_set(&self, new_pruning_point: Hash, imported_utxo_multiset: MuHash) -> PruningImportResult<()> {
Expand Down
96 changes: 94 additions & 2 deletions consensus/src/pipeline/virtual_processor/utxo_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use kaspa_muhash::MuHash;
use kaspa_utils::refs::Refs;

use rayon::prelude::*;
use smallvec::{smallvec, SmallVec};
use std::{iter::once, ops::Deref};

/// A context for processing the UTXO state of a block with respect to its selected parent.
Expand Down Expand Up @@ -95,12 +96,14 @@ impl VirtualStateProcessor {
// No need to fully validate selected parent transactions since selected parent txs were already validated
// as part of selected parent UTXO state verification with the exact same UTXO context.
let validation_flags = if is_selected_parent { TxValidationFlags::SkipScriptChecks } else { TxValidationFlags::Full };
let validated_transactions = self.validate_transactions_in_parallel(&txs, &composed_view, pov_daa_score, validation_flags);
let (validated_transactions, inner_multiset) =
self.validate_transactions_with_muhash_in_parallel(&txs, &composed_view, pov_daa_score, validation_flags);

ctx.multiset_hash.combine(&inner_multiset);

let mut block_fee = 0u64;
for (validated_tx, _) in validated_transactions.iter() {
ctx.mergeset_diff.add_transaction(validated_tx, pov_daa_score).unwrap();
ctx.multiset_hash.add_transaction(validated_tx, pov_daa_score);
ctx.accepted_tx_ids.push(validated_tx.id());
block_fee += validated_tx.calculated_fee;
}
Expand Down Expand Up @@ -229,6 +232,38 @@ impl VirtualStateProcessor {
})
}

/// Same as validate_transactions_in_parallel except during the iteration this will also
/// calculate the muhash in parallel for valid transactions
pub(crate) fn validate_transactions_with_muhash_in_parallel<'a, V: UtxoView + Sync>(
&self,
txs: &'a Vec<Transaction>,
utxo_view: &V,
pov_daa_score: u64,
flags: TxValidationFlags,
) -> (SmallVec<[(ValidatedTransaction<'a>, u32); 2]>, MuHash) {
self.thread_pool.install(|| {
txs
.par_iter() // We can do this in parallel without complications since block body validation already ensured
// that all txs within each block are independent
.enumerate()
.skip(1) // Skip the coinbase tx.
.filter_map(|(i, tx)| self.validate_transaction_in_utxo_context(tx, &utxo_view, pov_daa_score, flags).ok().map(|vtx| {
let mut mh = MuHash::new();
mh.add_transaction(&vtx, pov_daa_score);
(smallvec![(vtx, i as u32)], mh)
}
))
.reduce(
|| (smallvec![], MuHash::new()),
|mut a, mut b| {
a.0.append(&mut b.0);
a.1.combine(&b.1);
a
},
)
})
}

/// Attempts to populate the transaction with UTXO entries and performs all utxo-related tx validations
pub(super) fn validate_transaction_in_utxo_context<'a>(
&self,
Expand Down Expand Up @@ -318,3 +353,60 @@ impl VirtualStateProcessor {
Ok(())
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;

use super::*;

#[test]
fn test_rayon_reduce_retains_order() {
// this is an independent test to replicate the behavior of
// validate_txs_in_parallel and validate_txs_with_muhash_in_parallel
// and assert that the order of data is retained when doing par_iter
let data: Vec<u16> = (1..=1000).collect();

let collected: Vec<u16> = data
.par_iter()
.filter_map(|a| {
let chance: f64 = rand::random();
if chance < 0.05 {
return None;
}
Some(*a)
})
.collect();

println!("collected len: {}", collected.len());

collected.iter().tuple_windows().for_each(|(prev, curr)| {
// Data was originally sorted, so we check if they remain sorted after filtering
assert!(prev < curr, "expected {} < {} if original sort was preserved", prev, curr);
});

let reduced: SmallVec<[u16; 2]> = data
.par_iter()
.filter_map(|a: &u16| {
let chance: f64 = rand::random();
if chance < 0.05 {
return None;
}
Some(smallvec![*a])
})
.reduce(
|| smallvec![],
|mut arr, mut curr_data| {
arr.append(&mut curr_data);
arr
},
);

println!("reduced len: {}", reduced.len());

reduced.iter().tuple_windows().for_each(|(prev, curr)| {
// Data was originally sorted, so we check if they remain sorted after filtering
assert!(prev < curr, "expected {} < {} if original sort was preserved", prev, curr);
});
}
}

0 comments on commit c59a0d1

Please sign in to comment.