Skip to content

Commit

Permalink
enhance tx inputs processing (#495)
Browse files Browse the repository at this point in the history
* sighash reused trait

* benches are implemented

* use cache per iteration per function

* fix par versions

* fix benches

* use upgreadable read

* use concurrent cache

* use hashcache

* dont apply cache

* rollback rwlock and indexmap.

* remove scc

* apply par iter to `check_scripts`

* refactor check_scripts fn, fix tests

* fix clippy

* add bench with custom threadpool

* style: fmt

* suppress warnings

* Merge branch 'master' into bcm-parallel-processing

* renames + map err

* reuse code

* bench: avoid exposing cache map + iter pools in powers of 2

* simplify check_sig_op_counts

* use thread pool also if a single input
1. to avoid confusion
2. since tokio blocking threads are not meant to be used for processing anyway

* remove todo

* clear cache instead of recreate

* use and_then (so map_err can be called in a single location)

* extend check scripts tests for better coverage of the par_iter case

---------

Co-authored-by: Michael Sutton <[email protected]>
  • Loading branch information
biryukovmaxim and michaelsutton authored Oct 8, 2024
1 parent 1378e7b commit 1274e9c
Show file tree
Hide file tree
Showing 20 changed files with 538 additions and 175 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,16 @@ serde_json.workspace = true
flate2.workspace = true
rand_distr.workspace = true
kaspa-txscript-errors.workspace = true
kaspa-addresses.workspace = true

[[bench]]
name = "hash_benchmarks"
harness = false

[[bench]]
name = "check_scripts"
harness = false

[features]
html_reports = []
devnet-prealloc = ["kaspa-consensus-core/devnet-prealloc"]
126 changes: 126 additions & 0 deletions consensus/benches/check_scripts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion, SamplingMode};
use kaspa_addresses::{Address, Prefix, Version};
use kaspa_consensus::processes::transaction_validator::transaction_validator_populated::{
check_scripts_par_iter, check_scripts_par_iter_pool, check_scripts_sequential,
};
use kaspa_consensus_core::hashing::sighash::{calc_schnorr_signature_hash, SigHashReusedValuesUnsync};
use kaspa_consensus_core::hashing::sighash_type::SIG_HASH_ALL;
use kaspa_consensus_core::subnets::SubnetworkId;
use kaspa_consensus_core::tx::{MutableTransaction, Transaction, TransactionInput, TransactionOutpoint, UtxoEntry};
use kaspa_txscript::caches::Cache;
use kaspa_txscript::pay_to_address_script;
use rand::{thread_rng, Rng};
use secp256k1::Keypair;
use std::thread::available_parallelism;

// You may need to add more detailed mocks depending on your actual code.
fn mock_tx(inputs_count: usize, non_uniq_signatures: usize) -> (Transaction, Vec<UtxoEntry>) {
let reused_values = SigHashReusedValuesUnsync::new();
let dummy_prev_out = TransactionOutpoint::new(kaspa_hashes::Hash::from_u64_word(1), 1);
let mut tx = Transaction::new(
0,
vec![],
vec![],
0,
SubnetworkId::from_bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
0,
vec![],
);
let mut utxos = vec![];
let mut kps = vec![];
for _ in 0..inputs_count - non_uniq_signatures {
let kp = Keypair::new(secp256k1::SECP256K1, &mut thread_rng());
tx.inputs.push(TransactionInput { previous_outpoint: dummy_prev_out, signature_script: vec![], sequence: 0, sig_op_count: 1 });
let address = Address::new(Prefix::Mainnet, Version::PubKey, &kp.x_only_public_key().0.serialize());
utxos.push(UtxoEntry {
amount: thread_rng().gen::<u32>() as u64,
script_public_key: pay_to_address_script(&address),
block_daa_score: 333,
is_coinbase: false,
});
kps.push(kp);
}
for _ in 0..non_uniq_signatures {
let kp = kps.last().unwrap();
tx.inputs.push(TransactionInput { previous_outpoint: dummy_prev_out, signature_script: vec![], sequence: 0, sig_op_count: 1 });
let address = Address::new(Prefix::Mainnet, Version::PubKey, &kp.x_only_public_key().0.serialize());
utxos.push(UtxoEntry {
amount: thread_rng().gen::<u32>() as u64,
script_public_key: pay_to_address_script(&address),
block_daa_score: 444,
is_coinbase: false,
});
}
for (i, kp) in kps.iter().enumerate().take(inputs_count - non_uniq_signatures) {
let mut_tx = MutableTransaction::with_entries(&tx, utxos.clone());
let sig_hash = calc_schnorr_signature_hash(&mut_tx.as_verifiable(), i, SIG_HASH_ALL, &reused_values);
let msg = secp256k1::Message::from_digest_slice(sig_hash.as_bytes().as_slice()).unwrap();
let sig: [u8; 64] = *kp.sign_schnorr(msg).as_ref();
// This represents OP_DATA_65 <SIGNATURE+SIGHASH_TYPE> (since signature length is 64 bytes and SIGHASH_TYPE is one byte)
tx.inputs[i].signature_script = std::iter::once(65u8).chain(sig).chain([SIG_HASH_ALL.to_u8()]).collect();
}
let length = tx.inputs.len();
for i in (inputs_count - non_uniq_signatures)..length {
let kp = kps.last().unwrap();
let mut_tx = MutableTransaction::with_entries(&tx, utxos.clone());
let sig_hash = calc_schnorr_signature_hash(&mut_tx.as_verifiable(), i, SIG_HASH_ALL, &reused_values);
let msg = secp256k1::Message::from_digest_slice(sig_hash.as_bytes().as_slice()).unwrap();
let sig: [u8; 64] = *kp.sign_schnorr(msg).as_ref();
// This represents OP_DATA_65 <SIGNATURE+SIGHASH_TYPE> (since signature length is 64 bytes and SIGHASH_TYPE is one byte)
tx.inputs[i].signature_script = std::iter::once(65u8).chain(sig).chain([SIG_HASH_ALL.to_u8()]).collect();
}
(tx, utxos)
}

fn benchmark_check_scripts(c: &mut Criterion) {
for inputs_count in [100, 50, 25, 10, 5, 2] {
for non_uniq_signatures in [0, inputs_count / 2] {
let (tx, utxos) = mock_tx(inputs_count, non_uniq_signatures);
let mut group = c.benchmark_group(format!("inputs: {inputs_count}, non uniq: {non_uniq_signatures}"));
group.sampling_mode(SamplingMode::Flat);

group.bench_function("single_thread", |b| {
let tx = MutableTransaction::with_entries(&tx, utxos.clone());
let cache = Cache::new(inputs_count as u64);
b.iter(|| {
cache.clear();
check_scripts_sequential(black_box(&cache), black_box(&tx.as_verifiable())).unwrap();
})
});

group.bench_function("rayon par iter", |b| {
let tx = MutableTransaction::with_entries(tx.clone(), utxos.clone());
let cache = Cache::new(inputs_count as u64);
b.iter(|| {
cache.clear();
check_scripts_par_iter(black_box(&cache), black_box(&tx.as_verifiable())).unwrap();
})
});

// Iterate powers of two up to available parallelism
for i in (1..=(available_parallelism().unwrap().get() as f64).log2().ceil() as u32).map(|x| 2u32.pow(x) as usize) {
if inputs_count >= i {
group.bench_function(format!("rayon, custom thread pool, thread count {i}"), |b| {
let tx = MutableTransaction::with_entries(tx.clone(), utxos.clone());
// Create a custom thread pool with the specified number of threads
let pool = rayon::ThreadPoolBuilder::new().num_threads(i).build().unwrap();
let cache = Cache::new(inputs_count as u64);
b.iter(|| {
cache.clear();
check_scripts_par_iter_pool(black_box(&cache), black_box(&tx.as_verifiable()), black_box(&pool)).unwrap();
})
});
}
}
}
}
}

criterion_group! {
name = benches;
// This can be any expression that returns a `Criterion` object.
config = Criterion::default().with_output_color(true).measurement_time(std::time::Duration::new(20, 0));
targets = benchmark_check_scripts
}

criterion_main!(benches);
6 changes: 3 additions & 3 deletions consensus/client/src/sign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use core::iter::once;
use itertools::Itertools;
use kaspa_consensus_core::{
hashing::{
sighash::{calc_schnorr_signature_hash, SigHashReusedValues},
sighash::{calc_schnorr_signature_hash, SigHashReusedValuesUnsync},
sighash_type::SIG_HASH_ALL,
},
tx::PopulatedTransaction,
Expand Down Expand Up @@ -44,7 +44,7 @@ pub fn sign_with_multiple_v3<'a>(tx: &'a Transaction, privkeys: &[[u8; 32]]) ->
map.insert(script_pub_key_script, schnorr_key);
}

let mut reused_values = SigHashReusedValues::new();
let reused_values = SigHashReusedValuesUnsync::new();
let mut additional_signatures_required = false;
{
let input_len = tx.inner().inputs.len();
Expand All @@ -59,7 +59,7 @@ pub fn sign_with_multiple_v3<'a>(tx: &'a Transaction, privkeys: &[[u8; 32]]) ->
};
let script = script_pub_key.script();
if let Some(schnorr_key) = map.get(script) {
let sig_hash = calc_schnorr_signature_hash(&populated_transaction, i, SIG_HASH_ALL, &mut reused_values);
let sig_hash = calc_schnorr_signature_hash(&populated_transaction, i, SIG_HASH_ALL, &reused_values);
let msg = secp256k1::Message::from_digest_slice(sig_hash.as_bytes().as_slice()).unwrap();
let sig: [u8; 64] = *schnorr_key.sign_schnorr(msg).as_ref();
// This represents OP_DATA_65 <SIGNATURE+SIGHASH_TYPE> (since signature length is 64 bytes and SIGHASH_TYPE is one byte)
Expand Down
10 changes: 5 additions & 5 deletions consensus/client/src/signing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl SigHashCache {
}
}

pub fn sig_op_counts_hash(&mut self, tx: &Transaction, hash_type: SigHashType, reused_values: &mut SigHashReusedValues) -> Hash {
pub fn sig_op_counts_hash(&mut self, tx: &Transaction, hash_type: SigHashType, reused_values: &SigHashReusedValues) -> Hash {
if hash_type.is_sighash_anyone_can_pay() {
return ZERO_HASH;
}
Expand Down Expand Up @@ -185,16 +185,16 @@ pub fn calc_schnorr_signature_hash(
let mut hasher = TransactionSigningHash::new();
hasher
.write_u16(tx.version)
.update(previous_outputs_hash(&tx, hash_type, &mut reused_values))
.update(sequences_hash(&tx, hash_type, &mut reused_values))
.update(sig_op_counts_hash(&tx, hash_type, &mut reused_values));
.update(previous_outputs_hash(&tx, hash_type, &reused_values))
.update(sequences_hash(&tx, hash_type, &reused_values))
.update(sig_op_counts_hash(&tx, hash_type, &reused_values));
hash_outpoint(&mut hasher, input.previous_outpoint);
hash_script_public_key(&mut hasher, &utxo.script_public_key);
hasher
.write_u64(utxo.amount)
.write_u64(input.sequence)
.write_u8(input.sig_op_count)
.update(outputs_hash(&tx, hash_type, &mut reused_values, input_index))
.update(outputs_hash(&tx, hash_type, &reused_values, input_index))
.write_u64(tx.lock_time)
.update(&tx.subnetwork_id)
.write_u64(tx.gas)
Expand Down
1 change: 1 addition & 0 deletions consensus/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ wasm32-sdk = []
default = []

[dependencies]
arc-swap.workspace = true
async-trait.workspace = true
borsh.workspace = true
cfg-if.workspace = true
Expand Down
Loading

0 comments on commit 1274e9c

Please sign in to comment.