Skip to content

Commit

Permalink
Merge branch 'master' into optimize-window-cache-building-for-ibd
Browse files Browse the repository at this point in the history
  • Loading branch information
D-Stacks authored Oct 15, 2024
2 parents 8ce71fa + 0df2de5 commit aecdef8
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 23 deletions.
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ kaspa-txscript-errors.workspace = true
kaspa-addresses.workspace = true

[[bench]]
name = "hash_benchmarks"
name = "parallel_muhash"
harness = false

[[bench]]
Expand Down
4 changes: 2 additions & 2 deletions consensus/benches/check_scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use kaspa_consensus_core::subnets::SubnetworkId;
use kaspa_consensus_core::tx::{MutableTransaction, Transaction, TransactionInput, TransactionOutpoint, UtxoEntry};
use kaspa_txscript::caches::Cache;
use kaspa_txscript::pay_to_address_script;
use kaspa_utils::iter::parallelism_in_power_steps;
use rand::{thread_rng, Rng};
use secp256k1::Keypair;
use std::thread::available_parallelism;

// You may need to add more detailed mocks depending on your actual code.
fn mock_tx(inputs_count: usize, non_uniq_signatures: usize) -> (Transaction, Vec<UtxoEntry>) {
Expand Down Expand Up @@ -98,7 +98,7 @@ fn benchmark_check_scripts(c: &mut Criterion) {
});

// Iterate powers of two up to available parallelism
for i in (1..=(available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2u32.pow(x) as usize) {
for i in parallelism_in_power_steps() {
if inputs_count >= i {
group.bench_function(format!("rayon, custom thread pool, thread count {i}"), |b| {
let tx = MutableTransaction::with_entries(tx.clone(), utxos.clone());
Expand Down
15 changes: 0 additions & 15 deletions consensus/benches/hash_benchmarks.rs

This file was deleted.

66 changes: 66 additions & 0 deletions consensus/benches/parallel_muhash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use kaspa_consensus_core::{
muhash::MuHashExtensions,
subnets::SUBNETWORK_ID_NATIVE,
tx::{ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, TransactionOutput, UtxoEntry},
};
use kaspa_hashes::TransactionID;
use kaspa_muhash::MuHash;
use kaspa_utils::iter::parallelism_in_power_steps;
use rayon::prelude::*;

fn generate_transaction(ins: usize, outs: usize, randomness: u64) -> SignableTransaction {
let mut tx = Transaction::new(0, vec![], vec![], 0, SUBNETWORK_ID_NATIVE, 0, vec![]);
let mut entries = vec![];
for i in 0..ins {
let mut hasher = TransactionID::new();
hasher.write(i.to_le_bytes());
hasher.write(randomness.to_le_bytes());
let input = TransactionInput::new(TransactionOutpoint::new(hasher.finalize(), 0), vec![10; 66], 0, 1);
let entry = UtxoEntry::new(22222222, ScriptPublicKey::from_vec(0, vec![99; 34]), 23456, false);
tx.inputs.push(input);
entries.push(entry);
}
for _ in 0..outs {
let output = TransactionOutput::new(23456, ScriptPublicKey::from_vec(0, vec![101; 34]));
tx.outputs.push(output);
}
tx.finalize();
SignableTransaction::with_entries(tx, entries)
}

pub fn parallel_muhash_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("muhash txs");
let txs = (0..256).map(|i| generate_transaction(2, 2, i)).collect_vec();
group.bench_function("seq", |b| {
b.iter(|| {
let mut mh = MuHash::new();
for tx in txs.iter() {
mh.add_transaction(&tx.as_verifiable(), 222);
}
black_box(mh)
})
});

for threads in parallelism_in_power_steps() {
group.bench_function(format!("par {threads}"), |b| {
let pool = rayon::ThreadPoolBuilder::new().num_threads(threads).build().unwrap();
b.iter(|| {
pool.install(|| {
let mh =
txs.par_iter().map(|tx| MuHash::from_transaction(&tx.as_verifiable(), 222)).reduce(MuHash::new, |mut a, b| {
a.combine(&b);
a
});
black_box(mh)
})
})
});
}

group.finish();
}

criterion_group!(benches, parallel_muhash_benchmark);
criterion_main!(benches);
14 changes: 14 additions & 0 deletions consensus/core/src/muhash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use kaspa_muhash::MuHash;
pub trait MuHashExtensions {
fn add_transaction(&mut self, tx: &impl VerifiableTransaction, block_daa_score: u64);
fn add_utxo(&mut self, outpoint: &TransactionOutpoint, entry: &UtxoEntry);
fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self;
fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self;
}

impl MuHashExtensions for MuHash {
Expand All @@ -30,6 +32,18 @@ impl MuHashExtensions for MuHash {
write_utxo(&mut writer, entry, outpoint);
writer.finalize();
}

fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self {
let mut mh = Self::new();
mh.add_transaction(tx, block_daa_score);
mh
}

fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self {
let mut mh = Self::new();
mh.add_utxo(outpoint, entry);
mh
}
}

fn write_utxo(writer: &mut impl HasherBase, entry: &UtxoEntry, outpoint: &TransactionOutpoint) {
Expand Down
13 changes: 10 additions & 3 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
use kaspa_txscript::caches::TxScriptCacheCounters;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};

use std::{
cmp::Reverse,
Expand Down Expand Up @@ -771,9 +772,15 @@ impl ConsensusApi for Consensus {
fn append_imported_pruning_point_utxos(&self, utxoset_chunk: &[(TransactionOutpoint, UtxoEntry)], current_multiset: &mut MuHash) {
let mut pruning_utxoset_write = self.pruning_utxoset_stores.write();
pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap();
for (outpoint, entry) in utxoset_chunk {
current_multiset.add_utxo(outpoint, entry);
}

// Parallelize processing
let inner_multiset =
utxoset_chunk.par_iter().map(|(outpoint, entry)| MuHash::from_utxo(outpoint, entry)).reduce(MuHash::new, |mut a, b| {
a.combine(&b);
a
});

current_multiset.combine(&inner_multiset);
}

fn import_pruning_point_utxo_set(&self, new_pruning_point: Hash, imported_utxo_multiset: MuHash) -> PruningImportResult<()> {
Expand Down
95 changes: 93 additions & 2 deletions consensus/src/pipeline/virtual_processor/utxo_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use kaspa_muhash::MuHash;
use kaspa_utils::refs::Refs;

use rayon::prelude::*;
use smallvec::{smallvec, SmallVec};
use std::{iter::once, ops::Deref};

/// A context for processing the UTXO state of a block with respect to its selected parent.
Expand Down Expand Up @@ -95,12 +96,14 @@ impl VirtualStateProcessor {
// No need to fully validate selected parent transactions since selected parent txs were already validated
// as part of selected parent UTXO state verification with the exact same UTXO context.
let validation_flags = if is_selected_parent { TxValidationFlags::SkipScriptChecks } else { TxValidationFlags::Full };
let validated_transactions = self.validate_transactions_in_parallel(&txs, &composed_view, pov_daa_score, validation_flags);
let (validated_transactions, inner_multiset) =
self.validate_transactions_with_muhash_in_parallel(&txs, &composed_view, pov_daa_score, validation_flags);

ctx.multiset_hash.combine(&inner_multiset);

let mut block_fee = 0u64;
for (validated_tx, _) in validated_transactions.iter() {
ctx.mergeset_diff.add_transaction(validated_tx, pov_daa_score).unwrap();
ctx.multiset_hash.add_transaction(validated_tx, pov_daa_score);
ctx.accepted_tx_ids.push(validated_tx.id());
block_fee += validated_tx.calculated_fee;
}
Expand Down Expand Up @@ -229,6 +232,37 @@ impl VirtualStateProcessor {
})
}

/// Same as validate_transactions_in_parallel except during the iteration this will also
/// calculate the muhash in parallel for valid transactions
pub(crate) fn validate_transactions_with_muhash_in_parallel<'a, V: UtxoView + Sync>(
&self,
txs: &'a Vec<Transaction>,
utxo_view: &V,
pov_daa_score: u64,
flags: TxValidationFlags,
) -> (SmallVec<[(ValidatedTransaction<'a>, u32); 2]>, MuHash) {
self.thread_pool.install(|| {
txs
.par_iter() // We can do this in parallel without complications since block body validation already ensured
// that all txs within each block are independent
.enumerate()
.skip(1) // Skip the coinbase tx.
.filter_map(|(i, tx)| self.validate_transaction_in_utxo_context(tx, &utxo_view, pov_daa_score, flags).ok().map(|vtx| {
let mh = MuHash::from_transaction(&vtx, pov_daa_score);
(smallvec![(vtx, i as u32)], mh)
}
))
.reduce(
|| (smallvec![], MuHash::new()),
|mut a, mut b| {
a.0.append(&mut b.0);
a.1.combine(&b.1);
a
},
)
})
}

/// Attempts to populate the transaction with UTXO entries and performs all utxo-related tx validations
pub(super) fn validate_transaction_in_utxo_context<'a>(
&self,
Expand Down Expand Up @@ -318,3 +352,60 @@ impl VirtualStateProcessor {
Ok(())
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;

use super::*;

#[test]
fn test_rayon_reduce_retains_order() {
// this is an independent test to replicate the behavior of
// validate_txs_in_parallel and validate_txs_with_muhash_in_parallel
// and assert that the order of data is retained when doing par_iter
let data: Vec<u16> = (1..=1000).collect();

let collected: Vec<u16> = data
.par_iter()
.filter_map(|a| {
let chance: f64 = rand::random();
if chance < 0.05 {
return None;
}
Some(*a)
})
.collect();

println!("collected len: {}", collected.len());

collected.iter().tuple_windows().for_each(|(prev, curr)| {
// Data was originally sorted, so we check if they remain sorted after filtering
assert!(prev < curr, "expected {} < {} if original sort was preserved", prev, curr);
});

let reduced: SmallVec<[u16; 2]> = data
.par_iter()
.filter_map(|a: &u16| {
let chance: f64 = rand::random();
if chance < 0.05 {
return None;
}
Some(smallvec![*a])
})
.reduce(
|| smallvec![],
|mut arr, mut curr_data| {
arr.append(&mut curr_data);
arr
},
);

println!("reduced len: {}", reduced.len());

reduced.iter().tuple_windows().for_each(|(prev, curr)| {
// Data was originally sorted, so we check if they remain sorted after filtering
assert!(prev < curr, "expected {} < {} if original sort was preserved", prev, curr);
});
}
}
10 changes: 10 additions & 0 deletions crypto/muhash/src/u3072.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ impl U3072 {
}

fn mul(&mut self, other: &U3072) {
/*
Optimization: short-circuit when LHS is one
- This case is especially frequent during parallel reduce operation where the identity (one) is used for each sub-computation (at the LHS)
- If self ≠ one, the comparison should exit early, otherwise if they are equal -- we gain much more than we lose
- Benchmarks show that general performance remains the same while parallel reduction gains ~35%
*/
if *self == Self::one() {
*self = *other;
return;
}
let (mut carry_low, mut carry_high, mut carry_highest) = (0, 0, 0);
let mut tmp = Self::one();

Expand Down
6 changes: 6 additions & 0 deletions utils/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,9 @@ where
self.inner.clone().fmt(f)
}
}

/// Returns an iterator over powers of two up to (the rounded up) available parallelism: `2, 4, 8, ..., 2^(available_parallelism.log2().ceil())`,
/// i.e., for `std::thread::available_parallelism = 15` the function will return `2, 4, 8, 16`
pub fn parallelism_in_power_steps() -> impl Iterator<Item = usize> {
(1..=(std::thread::available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2usize.pow(x))
}

0 comments on commit aecdef8

Please sign in to comment.