Skip to content

Commit

Permalink
Adds --accounts-db-hash-threads to validator and ledger-tool (#3233)
Browse files Browse the repository at this point in the history
  • Loading branch information
brooksprumo authored Oct 24, 2024
1 parent c14034e commit 2065973
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 12 deletions.
16 changes: 13 additions & 3 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ use {
fs,
hash::{Hash as StdHash, Hasher as StdHasher},
io::Result as IoResult,
num::Saturating,
num::{NonZeroUsize, Saturating},
ops::{Range, RangeBounds},
path::{Path, PathBuf},
sync::{
Expand Down Expand Up @@ -510,6 +510,7 @@ pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
storage_access: StorageAccess::Mmap,
scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify,
enable_experimental_accumulator_hash: false,
num_hash_threads: None,
};
pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig {
index: Some(ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS),
Expand All @@ -527,6 +528,7 @@ pub const ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS: AccountsDbConfig = AccountsDbConfig
storage_access: StorageAccess::Mmap,
scan_filter_for_shrinking: ScanFilter::OnlyAbnormalWithVerify,
enable_experimental_accumulator_hash: false,
num_hash_threads: None,
};

pub type BinnedHashData = Vec<Vec<CalculateHashIntermediate>>;
Expand Down Expand Up @@ -638,6 +640,8 @@ pub struct AccountsDbConfig {
pub storage_access: StorageAccess,
pub scan_filter_for_shrinking: ScanFilter,
pub enable_experimental_accumulator_hash: bool,
/// Number of threads for background accounts hashing (`thread_pool_hash`)
pub num_hash_threads: Option<NonZeroUsize>,
}

#[cfg(not(test))]
Expand Down Expand Up @@ -1746,9 +1750,15 @@ pub fn make_min_priority_thread_pool() -> ThreadPool {
.unwrap()
}

pub fn make_hash_thread_pool() -> ThreadPool {
/// Returns the default number of threads to use for background accounts hashing
pub fn default_num_hash_threads() -> NonZeroUsize {
// 1/8 of the number of cpus and up to 6 threads gives good balance for the system.
let num_threads = (num_cpus::get() / 8).clamp(2, 6);
NonZeroUsize::new(num_threads).unwrap()
}

pub fn make_hash_thread_pool(num_threads: Option<NonZeroUsize>) -> ThreadPool {
let num_threads = num_threads.unwrap_or_else(default_num_hash_threads).get();
rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("solAcctHash{i:02}"))
.num_threads(num_threads)
Expand Down Expand Up @@ -1893,7 +1903,7 @@ impl AccountsDb {
.build()
.expect("new rayon threadpool");
let thread_pool_clean = make_min_priority_thread_pool();
let thread_pool_hash = make_hash_thread_pool();
let thread_pool_hash = make_hash_thread_pool(accounts_db_config.num_hash_threads);

let mut new = Self {
accounts_index,
Expand Down
15 changes: 14 additions & 1 deletion ledger-tool/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
solana_clap_utils::{
hidden_unless_forced,
input_parsers::pubkeys_of,
input_validators::{is_parsable, is_pow2},
input_validators::{is_parsable, is_pow2, is_within_range},
},
solana_ledger::{
blockstore_processor::ProcessOptions,
Expand All @@ -21,6 +21,7 @@ use {
solana_sdk::clock::Slot,
std::{
collections::HashSet,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::Arc,
},
Expand Down Expand Up @@ -131,6 +132,13 @@ pub fn accounts_db_args<'a, 'b>() -> Box<[Arg<'a, 'b>]> {
.long("accounts-db-experimental-accumulator-hash")
.help("Enables the experimental accumulator hash")
.hidden(hidden_unless_forced()),
Arg::with_name("accounts_db_hash_threads")
.long("accounts-db-hash-threads")
.value_name("NUM_THREADS")
.takes_value(true)
.validator(|s| is_within_range(s, 1..=num_cpus::get()))
.help("Number of threads to use for background accounts hashing")
.hidden(hidden_unless_forced()),
]
.into_boxed_slice()
}
Expand Down Expand Up @@ -331,6 +339,10 @@ pub fn get_accounts_db_config(
})
.unwrap_or_default();

let num_hash_threads = arg_matches
.is_present("accounts_db_hash_threads")
.then(|| value_t_or_exit!(arg_matches, "accounts_db_hash_threads", NonZeroUsize));

AccountsDbConfig {
index: Some(accounts_index_config),
base_working_path: Some(ledger_tool_ledger_path),
Expand All @@ -347,6 +359,7 @@ pub fn get_accounts_db_config(
scan_filter_for_shrinking,
enable_experimental_accumulator_hash: arg_matches
.is_present("accounts_db_experimental_accumulator_hash"),
num_hash_threads,
..AccountsDbConfig::default()
}
}
Expand Down
21 changes: 21 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

use {
clap::{value_t_or_exit, Arg, ArgMatches},
solana_accounts_db::accounts_db,
solana_clap_utils::{hidden_unless_forced, input_validators::is_within_range},
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
std::{num::NonZeroUsize, ops::RangeInclusive},
};

// Need this struct to provide &str whose lifetime matches that of the CLAP Arg's
pub struct DefaultThreadArgs {
pub accounts_db_hash_threads: String,
pub ip_echo_server_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
Expand All @@ -19,6 +21,7 @@ pub struct DefaultThreadArgs {
impl Default for DefaultThreadArgs {
fn default() -> Self {
Self {
accounts_db_hash_threads: AccountsDbHashThreadsArg::bounded_default().to_string(),
ip_echo_server_threads: IpEchoServerThreadsArg::bounded_default().to_string(),
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
Expand All @@ -31,6 +34,7 @@ impl Default for DefaultThreadArgs {

pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
vec![
new_thread_arg::<AccountsDbHashThreadsArg>(&defaults.accounts_db_hash_threads),
new_thread_arg::<IpEchoServerThreadsArg>(&defaults.ip_echo_server_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
Expand All @@ -51,6 +55,7 @@ fn new_thread_arg<'a, T: ThreadArg>(default: &str) -> Arg<'_, 'a> {
}

pub struct NumThreadConfig {
pub accounts_db_hash_threads: NonZeroUsize,
pub ip_echo_server_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
Expand All @@ -60,6 +65,11 @@ pub struct NumThreadConfig {

pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
NumThreadConfig {
accounts_db_hash_threads: value_t_or_exit!(
matches,
AccountsDbHashThreadsArg::NAME,
NonZeroUsize
),
ip_echo_server_threads: value_t_or_exit!(
matches,
IpEchoServerThreadsArg::NAME,
Expand Down Expand Up @@ -116,6 +126,17 @@ trait ThreadArg {
}
}

struct AccountsDbHashThreadsArg;
impl ThreadArg for AccountsDbHashThreadsArg {
const NAME: &'static str = "accounts_db_hash_threads";
const LONG_NAME: &'static str = "accounts-db-hash-threads";
const HELP: &'static str = "Number of threads to use for background accounts hashing";

fn default() -> usize {
accounts_db::default_num_hash_threads().get()
}
}

struct IpEchoServerThreadsArg;
impl ThreadArg for IpEchoServerThreadsArg {
const NAME: &'static str = "ip_echo_server_threads";
Expand Down
18 changes: 10 additions & 8 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,15 @@ pub fn main() {
_ => unreachable!(),
};

let cli::thread_args::NumThreadConfig {
accounts_db_hash_threads,
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_receive_threads,
tvu_sigverify_threads,
} = cli::thread_args::parse_num_threads_args(&matches);

let identity_keypair = keypair_of(&matches, "identity").unwrap_or_else(|| {
clap::Error::with_description(
"The --identity <KEYPAIR> argument is required",
Expand Down Expand Up @@ -1303,6 +1312,7 @@ pub fn main() {
scan_filter_for_shrinking,
enable_experimental_accumulator_hash: matches
.is_present("accounts_db_experimental_accumulator_hash"),
num_hash_threads: Some(accounts_db_hash_threads),
..AccountsDbConfig::default()
};

Expand Down Expand Up @@ -1387,14 +1397,6 @@ pub fn main() {

let full_api = matches.is_present("full_rpc_api");

let cli::thread_args::NumThreadConfig {
ip_echo_server_threads,
replay_forks_threads,
replay_transactions_threads,
tvu_receive_threads,
tvu_sigverify_threads,
} = cli::thread_args::parse_num_threads_args(&matches);

let mut validator_config = ValidatorConfig {
require_tower: matches.is_present("require_tower"),
tower_storage,
Expand Down

0 comments on commit 2065973

Please sign in to comment.