Skip to content

Commit

Permalink
Muhash parallel reduce -- optimize U3072 mul when LHS = one (#581)
Browse files Browse the repository at this point in the history
* semantic: add `from` ext methods

* muhash from txs benchmark

* optimization: in u3072 mul test if lhs is one

* extract `parallelism_in_power_steps`

* comment
  • Loading branch information
michaelsutton authored Oct 13, 2024
1 parent c59a0d1 commit 0df2de5
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 28 deletions.
2 changes: 1 addition & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ kaspa-txscript-errors.workspace = true
kaspa-addresses.workspace = true

[[bench]]
name = "hash_benchmarks"
name = "parallel_muhash"
harness = false

[[bench]]
Expand Down
4 changes: 2 additions & 2 deletions consensus/benches/check_scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use kaspa_consensus_core::subnets::SubnetworkId;
use kaspa_consensus_core::tx::{MutableTransaction, Transaction, TransactionInput, TransactionOutpoint, UtxoEntry};
use kaspa_txscript::caches::Cache;
use kaspa_txscript::pay_to_address_script;
use kaspa_utils::iter::parallelism_in_power_steps;
use rand::{thread_rng, Rng};
use secp256k1::Keypair;
use std::thread::available_parallelism;

// You may need to add more detailed mocks depending on your actual code.
fn mock_tx(inputs_count: usize, non_uniq_signatures: usize) -> (Transaction, Vec<UtxoEntry>) {
Expand Down Expand Up @@ -98,7 +98,7 @@ fn benchmark_check_scripts(c: &mut Criterion) {
});

// Iterate powers of two up to available parallelism
for i in (1..=(available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2u32.pow(x) as usize) {
for i in parallelism_in_power_steps() {
if inputs_count >= i {
group.bench_function(format!("rayon, custom thread pool, thread count {i}"), |b| {
let tx = MutableTransaction::with_entries(tx.clone(), utxos.clone());
Expand Down
15 changes: 0 additions & 15 deletions consensus/benches/hash_benchmarks.rs

This file was deleted.

66 changes: 66 additions & 0 deletions consensus/benches/parallel_muhash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use itertools::Itertools;
use kaspa_consensus_core::{
muhash::MuHashExtensions,
subnets::SUBNETWORK_ID_NATIVE,
tx::{ScriptPublicKey, SignableTransaction, Transaction, TransactionInput, TransactionOutpoint, TransactionOutput, UtxoEntry},
};
use kaspa_hashes::TransactionID;
use kaspa_muhash::MuHash;
use kaspa_utils::iter::parallelism_in_power_steps;
use rayon::prelude::*;

fn generate_transaction(ins: usize, outs: usize, randomness: u64) -> SignableTransaction {
let mut tx = Transaction::new(0, vec![], vec![], 0, SUBNETWORK_ID_NATIVE, 0, vec![]);
let mut entries = vec![];
for i in 0..ins {
let mut hasher = TransactionID::new();
hasher.write(i.to_le_bytes());
hasher.write(randomness.to_le_bytes());
let input = TransactionInput::new(TransactionOutpoint::new(hasher.finalize(), 0), vec![10; 66], 0, 1);
let entry = UtxoEntry::new(22222222, ScriptPublicKey::from_vec(0, vec![99; 34]), 23456, false);
tx.inputs.push(input);
entries.push(entry);
}
for _ in 0..outs {
let output = TransactionOutput::new(23456, ScriptPublicKey::from_vec(0, vec![101; 34]));
tx.outputs.push(output);
}
tx.finalize();
SignableTransaction::with_entries(tx, entries)
}

pub fn parallel_muhash_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("muhash txs");
let txs = (0..256).map(|i| generate_transaction(2, 2, i)).collect_vec();
group.bench_function("seq", |b| {
b.iter(|| {
let mut mh = MuHash::new();
for tx in txs.iter() {
mh.add_transaction(&tx.as_verifiable(), 222);
}
black_box(mh)
})
});

for threads in parallelism_in_power_steps() {
group.bench_function(format!("par {threads}"), |b| {
let pool = rayon::ThreadPoolBuilder::new().num_threads(threads).build().unwrap();
b.iter(|| {
pool.install(|| {
let mh =
txs.par_iter().map(|tx| MuHash::from_transaction(&tx.as_verifiable(), 222)).reduce(MuHash::new, |mut a, b| {
a.combine(&b);
a
});
black_box(mh)
})
})
});
}

group.finish();
}

criterion_group!(benches, parallel_muhash_benchmark);
criterion_main!(benches);
14 changes: 14 additions & 0 deletions consensus/core/src/muhash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use kaspa_muhash::MuHash;
pub trait MuHashExtensions {
fn add_transaction(&mut self, tx: &impl VerifiableTransaction, block_daa_score: u64);
fn add_utxo(&mut self, outpoint: &TransactionOutpoint, entry: &UtxoEntry);
fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self;
fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self;
}

impl MuHashExtensions for MuHash {
Expand All @@ -30,6 +32,18 @@ impl MuHashExtensions for MuHash {
write_utxo(&mut writer, entry, outpoint);
writer.finalize();
}

fn from_transaction(tx: &impl VerifiableTransaction, block_daa_score: u64) -> Self {
let mut mh = Self::new();
mh.add_transaction(tx, block_daa_score);
mh
}

fn from_utxo(outpoint: &TransactionOutpoint, entry: &UtxoEntry) -> Self {
let mut mh = Self::new();
mh.add_utxo(outpoint, entry);
mh
}
}

fn write_utxo(writer: &mut impl HasherBase, entry: &UtxoEntry, outpoint: &TransactionOutpoint) {
Expand Down
10 changes: 2 additions & 8 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,14 +774,8 @@ impl ConsensusApi for Consensus {
pruning_utxoset_write.utxo_set.write_many(utxoset_chunk).unwrap();

// Parallelize processing
let inner_multiset = utxoset_chunk
.par_iter()
.map(|(outpoint, entry)| {
let mut inner_multiset = MuHash::new();
inner_multiset.add_utxo(outpoint, entry);
inner_multiset
})
.reduce(MuHash::new, |mut a, b| {
let inner_multiset =
utxoset_chunk.par_iter().map(|(outpoint, entry)| MuHash::from_utxo(outpoint, entry)).reduce(MuHash::new, |mut a, b| {
a.combine(&b);
a
});
Expand Down
3 changes: 1 addition & 2 deletions consensus/src/pipeline/virtual_processor/utxo_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ impl VirtualStateProcessor {
.enumerate()
.skip(1) // Skip the coinbase tx.
.filter_map(|(i, tx)| self.validate_transaction_in_utxo_context(tx, &utxo_view, pov_daa_score, flags).ok().map(|vtx| {
let mut mh = MuHash::new();
mh.add_transaction(&vtx, pov_daa_score);
let mh = MuHash::from_transaction(&vtx, pov_daa_score);
(smallvec![(vtx, i as u32)], mh)
}
))
Expand Down
10 changes: 10 additions & 0 deletions crypto/muhash/src/u3072.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ impl U3072 {
}

fn mul(&mut self, other: &U3072) {
/*
Optimization: short-circuit when LHS is one
- This case is especially frequent during parallel reduce operation where the identity (one) is used for each sub-computation (at the LHS)
- If self ≠ one, the comparison should exit early, otherwise if they are equal -- we gain much more than we lose
- Benchmarks show that general performance remains the same while parallel reduction gains ~35%
*/
if *self == Self::one() {
*self = *other;
return;
}
let (mut carry_low, mut carry_high, mut carry_highest) = (0, 0, 0);
let mut tmp = Self::one();

Expand Down
6 changes: 6 additions & 0 deletions utils/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,9 @@ where
self.inner.clone().fmt(f)
}
}

/// Returns an iterator over powers of two up to (the rounded up) available parallelism: `2, 4, 8, ..., 2^(available_parallelism.log2().ceil())`,
/// i.e., for `std::thread::available_parallelism = 15` the function will return `2, 4, 8, 16`
pub fn parallelism_in_power_steps() -> impl Iterator<Item = usize> {
(1..=(std::thread::available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2usize.pow(x))
}

0 comments on commit 0df2de5

Please sign in to comment.