diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index b9a183ea8c..443e591c8a 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -57,7 +57,7 @@ kaspa-txscript-errors.workspace = true kaspa-addresses.workspace = true [[bench]] -name = "hash_benchmarks" +name = "parallel_muhash" harness = false [[bench]] diff --git a/consensus/benches/check_scripts.rs b/consensus/benches/check_scripts.rs index d65ac63626..4a596da1b2 100644 --- a/consensus/benches/check_scripts.rs +++ b/consensus/benches/check_scripts.rs @@ -9,9 +9,9 @@ use kaspa_consensus_core::subnets::SubnetworkId; use kaspa_consensus_core::tx::{MutableTransaction, Transaction, TransactionInput, TransactionOutpoint, UtxoEntry}; use kaspa_txscript::caches::Cache; use kaspa_txscript::pay_to_address_script; +use kaspa_utils::iter::parallelism_in_power_steps; use rand::{thread_rng, Rng}; use secp256k1::Keypair; -use std::thread::available_parallelism; // You may need to add more detailed mocks depending on your actual code. fn mock_tx(inputs_count: usize, non_uniq_signatures: usize) -> (Transaction, Vec) { @@ -98,7 +98,7 @@ fn benchmark_check_scripts(c: &mut Criterion) { }); // Iterate powers of two up to available parallelism - for i in (1..=(available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2u32.pow(x) as usize) { + for i in parallelism_in_power_steps() { if inputs_count >= i { group.bench_function(format!("rayon, custom thread pool, thread count {i}"), |b| { let tx = MutableTransaction::with_entries(tx.clone(), utxos.clone()); diff --git a/consensus/benches/hash_benchmarks.rs b/consensus/benches/hash_benchmarks.rs deleted file mode 100644 index 8ba6836b8d..0000000000 --- a/consensus/benches/hash_benchmarks.rs +++ /dev/null @@ -1,15 +0,0 @@ -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use std::str::FromStr; - -use kaspa_hashes::Hash; - -/// Placeholder for actual benchmarks -pub fn hash_benchmark(c: &mut Criterion) { - c.bench_function("Hash::from_str", |b| { - let hash_str = "8e40af02265360d59f4ecf9ae9ebf8f00a3118408f5a9cdcbcc9c0f93642f3af"; - b.iter(|| Hash::from_str(black_box(hash_str))) - }); -} - -criterion_group!(benches, hash_benchmark); -criterion_main!(benches); diff --git a/consensus/benches/parallel_muhash.rs b/consensus/benches/parallel_muhash.rs new file mode 100644 index 0000000000..99ab5b6c3a --- /dev/null +++ b/consensus/benches/parallel_muhash.rs @@ -0,0 +1,66 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use itertools::Itertools; +use kaspa_consensus_core::{ + muhash::MuHashExtensions, + subnets::SUBNETWORK_ID_NATIVE, + tx::{ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, TransactionOutput, UtxoEntry}, +}; +use kaspa_hashes::TransactionID; +use kaspa_muhash::MuHash; +use kaspa_utils::iter::parallelism_in_power_steps; +use rayon::prelude::*; + +fn generate_transaction(ins: usize, outs: usize, randomness: u64) -> SignableTransaction { + let mut tx = Transaction::new(0, vec![], vec![], 0, SUBNETWORK_ID_NATIVE, 0, vec![]); + let mut entries = vec![]; + for i in 0..ins { + let mut hasher = TransactionID::new(); + hasher.write(i.to_le_bytes()); + hasher.write(randomness.to_le_bytes()); + let input = TransactionInput::new(TransactionOutpoint::new(hasher.finalize(), 0), vec![10; 66], 0, 1); + let entry = UtxoEntry::new(22222222, ScriptPublicKey::from_vec(0, vec![99; 34]), 23456, false); + tx.inputs.push(input); + entries.push(entry); + } + for _ in 0..outs { + let output = TransactionOutput::new(23456, ScriptPublicKey::from_vec(0, vec![101; 34])); + tx.outputs.push(output); + } + tx.finalize(); + SignableTransaction::with_entries(tx, entries) +} + +pub fn parallel_muhash_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("muhash txs"); + let txs = (0..256).map(|i| generate_transaction(2, 2, i)).collect_vec(); + group.bench_function("seq", |b| { + b.iter(|| { + let mut mh = MuHash::new(); + for tx in txs.iter() { + mh.add_transaction(&tx.as_verifiable(), 222); + } + black_box(mh) + }) + }); + + for threads in parallelism_in_power_steps() { + group.bench_function(format!("par {threads}"), |b| { + let pool = rayon::ThreadPoolBuilder::new().num_threads(threads).build().unwrap(); + b.iter(|| { + pool.install(|| { + let mh = + txs.par_iter().map(|tx| MuHash::from_transaction(&tx.as_verifiable(), 222)).reduce(MuHash::new, |mut a, b| { + a.combine(&b); + a + }); + black_box(mh) + }) + }) + }); + } + + group.finish(); +} + +criterion_group!(benches, parallel_muhash_benchmark); +criterion_main!(benches); diff --git a/consensus/core/src/muhash.rs b/consensus/core/src/muhash.rs index 3782f855c0..0286596eba 100644 --- a/consensus/core/src/muhash.rs +++ b/consensus/core/src/muhash.rs @@ -8,6 +8,8 @@ use kaspa_muhash::MuHash; pub trait MuHashExtensions { fn add_transaction(&mut self, tx: &impl VerifiableTransaction, block_daa_score: u64); fn add_utxo(&mut self, outpoint: &TransactionOutpoint, entry: &UtxoEntry); + fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self; + fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self; } impl MuHashExtensions for MuHash { @@ -30,6 +32,18 @@ impl MuHashExtensions for MuHash { write_utxo(&mut writer, entry, outpoint); writer.finalize(); } + + fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self { + let mut mh = Self::new(); + mh.add_transaction(tx, block_daa_score); + mh + } + + fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self { + let mut mh = Self::new(); + mh.add_utxo(outpoint, entry); + mh + } } fn write_utxo(writer: &mut impl HasherBase, entry: &UtxoEntry, outpoint: &TransactionOutpoint) { diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 1731729a32..d9e4ac7d14 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -81,6 +81,7 @@ use kaspa_database::prelude::StoreResultExtensions; use kaspa_hashes::Hash; use kaspa_muhash::MuHash; use kaspa_txscript::caches::TxScriptCacheCounters; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{ cmp::Reverse, @@ -771,9 +772,15 @@ impl ConsensusApi for Consensus { fn append_imported_pruning_point_utxos(&self, utxoset_chunk: &[(TransactionOutpoint, UtxoEntry)], current_multiset: &mut MuHash) { let mut pruning_utxoset_write = self.pruning_utxoset_stores.write(); pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap(); - for (outpoint, entry) in utxoset_chunk { - current_multiset.add_utxo(outpoint, entry); - } + + // Parallelize processing + let inner_multiset = + utxoset_chunk.par_iter().map(|(outpoint, entry)| MuHash::from_utxo(outpoint, entry)).reduce(MuHash::new, |mut a, b| { + a.combine(&b); + a + }); + + current_multiset.combine(&inner_multiset); } fn import_pruning_point_utxo_set(&self, new_pruning_point: Hash, imported_utxo_multiset: MuHash) -> PruningImportResult<()> { diff --git a/consensus/src/pipeline/virtual_processor/utxo_validation.rs b/consensus/src/pipeline/virtual_processor/utxo_validation.rs index 306f81446c..6516888188 100644 --- a/consensus/src/pipeline/virtual_processor/utxo_validation.rs +++ b/consensus/src/pipeline/virtual_processor/utxo_validation.rs @@ -31,6 +31,7 @@ use kaspa_muhash::MuHash; use kaspa_utils::refs::Refs; use rayon::prelude::*; +use smallvec::{smallvec, SmallVec}; use std::{iter::once, ops::Deref}; /// A context for processing the UTXO state of a block with respect to its selected parent. @@ -95,12 +96,14 @@ impl VirtualStateProcessor { // No need to fully validate selected parent transactions since selected parent txs were already validated // as part of selected parent UTXO state verification with the exact same UTXO context. let validation_flags = if is_selected_parent { TxValidationFlags::SkipScriptChecks } else { TxValidationFlags::Full }; - let validated_transactions = self.validate_transactions_in_parallel(&txs, &composed_view, pov_daa_score, validation_flags); + let (validated_transactions, inner_multiset) = + self.validate_transactions_with_muhash_in_parallel(&txs, &composed_view, pov_daa_score, validation_flags); + + ctx.multiset_hash.combine(&inner_multiset); let mut block_fee = 0u64; for (validated_tx, _) in validated_transactions.iter() { ctx.mergeset_diff.add_transaction(validated_tx, pov_daa_score).unwrap(); - ctx.multiset_hash.add_transaction(validated_tx, pov_daa_score); ctx.accepted_tx_ids.push(validated_tx.id()); block_fee += validated_tx.calculated_fee; } @@ -229,6 +232,37 @@ impl VirtualStateProcessor { }) } + /// Same as validate_transactions_in_parallel except during the iteration this will also + /// calculate the muhash in parallel for valid transactions + pub(crate) fn validate_transactions_with_muhash_in_parallel<'a, V: UtxoView + Sync>( + &self, + txs: &'a Vec, + utxo_view: &V, + pov_daa_score: u64, + flags: TxValidationFlags, + ) -> (SmallVec<[(ValidatedTransaction<'a>, u32); 2]>, MuHash) { + self.thread_pool.install(|| { + txs + .par_iter() // We can do this in parallel without complications since block body validation already ensured + // that all txs within each block are independent + .enumerate() + .skip(1) // Skip the coinbase tx. + .filter_map(|(i, tx)| self.validate_transaction_in_utxo_context(tx, &utxo_view, pov_daa_score, flags).ok().map(|vtx| { + let mh = MuHash::from_transaction(&vtx, pov_daa_score); + (smallvec![(vtx, i as u32)], mh) + } + )) + .reduce( + || (smallvec![], MuHash::new()), + |mut a, mut b| { + a.0.append(&mut b.0); + a.1.combine(&b.1); + a + }, + ) + }) + } + /// Attempts to populate the transaction with UTXO entries and performs all utxo-related tx validations pub(super) fn validate_transaction_in_utxo_context<'a>( &self, @@ -318,3 +352,60 @@ impl VirtualStateProcessor { Ok(()) } } + +#[cfg(test)] +mod tests { + use itertools::Itertools; + + use super::*; + + #[test] + fn test_rayon_reduce_retains_order() { + // this is an independent test to replicate the behavior of + // validate_txs_in_parallel and validate_txs_with_muhash_in_parallel + // and assert that the order of data is retained when doing par_iter + let data: Vec = (1..=1000).collect(); + + let collected: Vec = data + .par_iter() + .filter_map(|a| { + let chance: f64 = rand::random(); + if chance < 0.05 { + return None; + } + Some(*a) + }) + .collect(); + + println!("collected len: {}", collected.len()); + + collected.iter().tuple_windows().for_each(|(prev, curr)| { + // Data was originally sorted, so we check if they remain sorted after filtering + assert!(prev < curr, "expected {} < {} if original sort was preserved", prev, curr); + }); + + let reduced: SmallVec<[u16; 2]> = data + .par_iter() + .filter_map(|a: &u16| { + let chance: f64 = rand::random(); + if chance < 0.05 { + return None; + } + Some(smallvec![*a]) + }) + .reduce( + || smallvec![], + |mut arr, mut curr_data| { + arr.append(&mut curr_data); + arr + }, + ); + + println!("reduced len: {}", reduced.len()); + + reduced.iter().tuple_windows().for_each(|(prev, curr)| { + // Data was originally sorted, so we check if they remain sorted after filtering + assert!(prev < curr, "expected {} < {} if original sort was preserved", prev, curr); + }); + } +} diff --git a/crypto/muhash/src/u3072.rs b/crypto/muhash/src/u3072.rs index 82021eb88d..fae82b8cf1 100644 --- a/crypto/muhash/src/u3072.rs +++ b/crypto/muhash/src/u3072.rs @@ -88,6 +88,16 @@ impl U3072 { } fn mul(&mut self, other: &U3072) { + /* + Optimization: short-circuit when LHS is one + - This case is especially frequent during parallel reduce operation where the identity (one) is used for each sub-computation (at the LHS) + - If self ≠ one, the comparison should exit early, otherwise if they are equal -- we gain much more than we lose + - Benchmarks show that general performance remains the same while parallel reduction gains ~35% + */ + if *self == Self::one() { + *self = *other; + return; + } let (mut carry_low, mut carry_high, mut carry_highest) = (0, 0, 0); let mut tmp = Self::one(); diff --git a/utils/src/iter.rs b/utils/src/iter.rs index 58a61d7707..3c4c98c64a 100644 --- a/utils/src/iter.rs +++ b/utils/src/iter.rs @@ -48,3 +48,9 @@ where self.inner.clone().fmt(f) } } + +/// Returns an iterator over powers of two up to (the rounded up) available parallelism: `2, 4, 8, ..., 2^(available_parallelism.log2().ceil())`, +/// i.e., for `std::thread::available_parallelism = 15` the function will return `2, 4, 8, 16` +pub fn parallelism_in_power_steps() -> impl Iterator { + (1..=(std::thread::available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2usize.pow(x)) +}