diff --git a/accounts-db/src/accounts_db.rs b/accounts-db/src/accounts_db.rs index 39f7f84918fa78..343f5006457c3b 100644 --- a/accounts-db/src/accounts_db.rs +++ b/accounts-db/src/accounts_db.rs @@ -105,7 +105,7 @@ use { path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}, - Arc, Condvar, Mutex, + Arc, Condvar, Mutex, RwLock, }, thread::{sleep, Builder}, time::{Duration, Instant}, @@ -1051,7 +1051,6 @@ pub enum AccountsHashVerificationError { struct CleanKeyTimings { collect_delta_keys_us: u64, delta_insert_us: u64, - hashset_to_vec_us: u64, dirty_store_processing_us: u64, delta_key_count: u64, dirty_pubkeys_count: u64, @@ -1342,6 +1341,19 @@ impl StoreAccountsTiming { } } +#[derive(Default, Debug)] +struct CleaningInfo { + slot_list: SlotList, + ref_count: u64, +} + +/// This is the return type of AccountsDb::construct_candidate_clean_keys. +/// It's a collection of pubkeys with associated information to +/// facilitate the decision making about which accounts can be removed +/// from the accounts index. In addition, the minimal dirty slot is +/// included in the returned value. +type CleaningCandidates = (Box<[RwLock>]>, Option); + /// Removing unrooted slots in Accounts Background Service needs to be synchronized with flushing /// slots from the Accounts Cache. This keeps track of those slots and the Mutex + Condvar for /// synchronization. @@ -2746,7 +2758,8 @@ impl AccountsDb { /// 1. one of the pubkeys in the store has account info to a store whose store count is not going to zero /// 2. a pubkey we were planning to remove is not removing all stores that contain the account fn calc_delete_dependencies( - purges: &HashMap, RefCount)>, + &self, + candidates: &[RwLock>], store_counts: &mut HashMap)>, min_slot: Option, ) { @@ -2754,77 +2767,99 @@ impl AccountsDb { // do not match the criteria of deleting all appendvecs which contain them // then increment their storage count. let mut already_counted = IntSet::default(); - for (pubkey, (slot_list, ref_count)) in purges.iter() { - let mut failed_slot = None; - let all_stores_being_deleted = slot_list.len() as RefCount == *ref_count; - if all_stores_being_deleted { - let mut delete = true; - for (slot, _account_info) in slot_list { - if let Some(count) = store_counts.get(slot).map(|s| s.0) { - debug!( - "calc_delete_dependencies() + for (bin_index, bin) in candidates.iter().enumerate() { + let bin = bin.read().unwrap(); + for ( + pubkey, + CleaningInfo { + slot_list, + ref_count, + }, + ) in bin.iter().filter(|x| !x.1.slot_list.is_empty()) + { + let mut failed_slot = None; + let all_stores_being_deleted = slot_list.len() as RefCount == *ref_count; + if all_stores_being_deleted { + let mut delete = true; + for (slot, _account_info) in slot_list { + if let Some(count) = store_counts.get(slot).map(|s| s.0) { + debug!( + "calc_delete_dependencies() slot: {slot}, count len: {count}" - ); - if count == 0 { - // this store CAN be removed - continue; + ); + if count == 0 { + // this store CAN be removed + continue; + } } + // One of the pubkeys in the store has account info to a store whose store count is not going to zero. + // If the store cannot be found, that also means store isn't being deleted. + failed_slot = Some(*slot); + delete = false; + break; } - // One of the pubkeys in the store has account info to a store whose store count is not going to zero. - // If the store cannot be found, that also means store isn't being deleted. - failed_slot = Some(*slot); - delete = false; - break; - } - if delete { - // this pubkey can be deleted from all stores it is in - continue; - } - } else { - // a pubkey we were planning to remove is not removing all stores that contain the account - debug!( - "calc_delete_dependencies(), + if delete { + // this pubkey can be deleted from all stores it is in + continue; + } + } else { + // a pubkey we were planning to remove is not removing all stores that contain the account + debug!( + "calc_delete_dependencies(), pubkey: {}, slot_list: {:?}, slot_list_len: {}, ref_count: {}", - pubkey, - slot_list, - slot_list.len(), - ref_count, - ); - } - - // increment store_counts to non-zero for all stores that can not be deleted. - let mut pending_stores = IntSet::default(); - for (slot, _account_info) in slot_list { - if !already_counted.contains(slot) { - pending_stores.insert(*slot); + pubkey, + slot_list, + slot_list.len(), + ref_count, + ); } - } - while !pending_stores.is_empty() { - let slot = pending_stores.iter().next().cloned().unwrap(); - if Some(slot) == min_slot { - if let Some(failed_slot) = failed_slot.take() { - info!("calc_delete_dependencies, oldest slot is not able to be deleted because of {pubkey} in slot {failed_slot}"); - } else { - info!("calc_delete_dependencies, oldest slot is not able to be deleted because of {pubkey}, slot list len: {}, ref count: {ref_count}", slot_list.len()); + + // increment store_counts to non-zero for all stores that can not be deleted. + let mut pending_stores = IntSet::default(); + for (slot, _account_info) in slot_list { + if !already_counted.contains(slot) { + pending_stores.insert(*slot); } } + while !pending_stores.is_empty() { + let slot = pending_stores.iter().next().cloned().unwrap(); + if Some(slot) == min_slot { + if let Some(failed_slot) = failed_slot.take() { + info!("calc_delete_dependencies, oldest slot is not able to be deleted because of {pubkey} in slot {failed_slot}"); + } else { + info!("calc_delete_dependencies, oldest slot is not able to be deleted because of {pubkey}, slot list len: {}, ref count: {ref_count}", slot_list.len()); + } + } - pending_stores.remove(&slot); - if !already_counted.insert(slot) { - continue; - } - // the point of all this code: remove the store count for all stores we cannot remove - if let Some(store_count) = store_counts.remove(&slot) { - // all pubkeys in this store also cannot be removed from all stores they are in - let affected_pubkeys = &store_count.1; - for key in affected_pubkeys { - for (slot, _account_info) in &purges.get(key).unwrap().0 { - if !already_counted.contains(slot) { - pending_stores.insert(*slot); + pending_stores.remove(&slot); + if !already_counted.insert(slot) { + continue; + } + // the point of all this code: remove the store count for all stores we cannot remove + if let Some(store_count) = store_counts.remove(&slot) { + // all pubkeys in this store also cannot be removed from all stores they are in + let affected_pubkeys = &store_count.1; + for key in affected_pubkeys { + let candidates_bin_index = + self.accounts_index.bin_calculator.bin_from_pubkey(key); + let mut update_pending_stores = + |bin: &HashMap| { + for (slot, _account_info) in &bin.get(key).unwrap().slot_list { + if !already_counted.contains(slot) { + pending_stores.insert(*slot); + } + } + }; + if candidates_bin_index == bin_index { + update_pending_stores(&bin); + } else { + update_pending_stores( + &candidates[candidates_bin_index].read().unwrap(), + ); } } } @@ -2987,6 +3022,13 @@ impl AccountsDb { self.remove_uncleaned_slots_and_collect_pubkeys(uncleaned_slots) } + fn count_pubkeys(candidates: &[RwLock>]) -> u64 { + candidates + .iter() + .map(|x| x.read().unwrap().len()) + .sum::() as u64 + } + /// Construct a vec of pubkeys for cleaning from: /// uncleaned_pubkeys - the delta set of updated pubkeys in rooted slots from the last clean /// dirty_stores - set of stores which had accounts removed or recently rooted @@ -2997,7 +3039,7 @@ impl AccountsDb { is_startup: bool, timings: &mut CleanKeyTimings, epoch_schedule: &EpochSchedule, - ) -> (Vec, Option) { + ) -> CleaningCandidates { let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule); let mut dirty_store_processing_time = Measure::start("dirty_store_processing"); let max_slot_inclusive = @@ -3016,7 +3058,17 @@ impl AccountsDb { } }); let dirty_stores_len = dirty_stores.len(); - let pubkeys = DashSet::new(); + let num_bins = self.accounts_index.bins(); + let candidates: Box<_> = + std::iter::repeat_with(|| RwLock::new(HashMap::::new())) + .take(num_bins) + .collect(); + + let insert_pubkey = |pubkey: Pubkey| { + let index = self.accounts_index.bin_calculator.bin_from_pubkey(&pubkey); + let mut candidates_bin = candidates[index].write().unwrap(); + candidates_bin.insert(pubkey, CleaningInfo::default()); + }; let dirty_ancient_stores = AtomicUsize::default(); let mut dirty_store_routine = || { let chunk_size = 1.max(dirty_stores_len.saturating_div(rayon::current_num_threads())); @@ -3029,8 +3081,8 @@ impl AccountsDb { dirty_ancient_stores.fetch_add(1, Ordering::Relaxed); } oldest_dirty_slot = oldest_dirty_slot.min(*slot); - store.accounts.scan_pubkeys(|k| { - pubkeys.insert(*k); + store.accounts.scan_pubkeys(|key| { + insert_pubkey(*key); }); }); oldest_dirty_slot @@ -3050,12 +3102,12 @@ impl AccountsDb { dirty_store_routine(); }); } + timings.dirty_pubkeys_count = Self::count_pubkeys(&candidates); trace!( "dirty_stores.len: {} pubkeys.len: {}", dirty_stores_len, - pubkeys.len() + timings.dirty_pubkeys_count, ); - timings.dirty_pubkeys_count = pubkeys.len() as u64; dirty_store_processing_time.stop(); timings.dirty_store_processing_us += dirty_store_processing_time.as_us(); timings.dirty_ancient_stores = dirty_ancient_stores.load(Ordering::Relaxed); @@ -3070,19 +3122,14 @@ impl AccountsDb { self.thread_pool_clean.install(|| { delta_keys.par_iter().for_each(|keys| { for key in keys { - pubkeys.insert(*key); + insert_pubkey(*key); } }); }); delta_insert.stop(); timings.delta_insert_us += delta_insert.as_us(); - timings.delta_key_count = pubkeys.len() as u64; - - let mut hashset_to_vec = Measure::start("flat_map"); - let mut pubkeys: Vec = pubkeys.into_iter().collect(); - hashset_to_vec.stop(); - timings.hashset_to_vec_us += hashset_to_vec.as_us(); + timings.delta_key_count = Self::count_pubkeys(&candidates); // Check if we should purge any of the zero_lamport_accounts_to_purge_later, based on the // latest_full_snapshot_slot. @@ -3097,13 +3144,13 @@ impl AccountsDb { let is_candidate_for_clean = max_slot_inclusive >= *slot && latest_full_snapshot_slot >= *slot; if is_candidate_for_clean { - pubkeys.push(*pubkey); + insert_pubkey(*pubkey); } !is_candidate_for_clean }); } - (pubkeys, min_dirty_slot) + (candidates, min_dirty_slot) } /// Call clean_accounts() with the common parameters that tests/benches use. @@ -3206,23 +3253,14 @@ impl AccountsDb { self.report_store_stats(); let mut key_timings = CleanKeyTimings::default(); - let (mut candidates, min_dirty_slot) = self.construct_candidate_clean_keys( + let (candidates, min_dirty_slot) = self.construct_candidate_clean_keys( max_clean_root_inclusive, is_startup, &mut key_timings, epoch_schedule, ); - let mut sort = Measure::start("sort"); - if is_startup { - candidates.par_sort_unstable(); - } else { - self.thread_pool_clean - .install(|| candidates.par_sort_unstable()); - } - sort.stop(); - - let num_candidates = candidates.len(); + let num_candidates = Self::count_pubkeys(&candidates); let mut accounts_scan = Measure::start("accounts_scan"); let uncleaned_roots = self.accounts_index.clone_uncleaned_roots(); let found_not_zero_accum = AtomicU64::new(0); @@ -3231,109 +3269,115 @@ impl AccountsDb { let useful_accum = AtomicU64::new(0); // parallel scan the index. - let (mut purges_zero_lamports, purges_old_accounts) = { + let purges_old_accounts = { let do_clean_scan = || { candidates - .par_chunks(4096) - .map(|candidates: &[Pubkey]| { - let mut purges_zero_lamports = HashMap::new(); + .par_iter() + .map(|candidates_bin| { let mut purges_old_accounts = Vec::new(); let mut found_not_zero = 0; let mut not_found_on_fork = 0; let mut missing = 0; let mut useful = 0; - self.accounts_index.scan( - candidates.iter(), - |candidate, slot_list_and_ref_count, _entry| { - let mut useless = true; - if let Some((slot_list, ref_count)) = slot_list_and_ref_count { - // find the highest rooted slot in the slot list - let index_in_slot_list = self.accounts_index.latest_slot( - None, - slot_list, - max_clean_root_inclusive, - ); - - match index_in_slot_list { - Some(index_in_slot_list) => { - // found info relative to max_clean_root - let (slot, account_info) = - &slot_list[index_in_slot_list]; - if account_info.is_zero_lamport() { - useless = false; - // the latest one is zero lamports. we may be able to purge it. - // so, add to purges_zero_lamports - purges_zero_lamports.insert( - *candidate, - ( - // add all the rooted entries that contain this pubkey. we know the highest rooted entry is zero lamports - self.accounts_index.get_rooted_entries( - slot_list, - max_clean_root_inclusive, - ), - ref_count, - ), + let mut candidates_bin = candidates_bin.write().unwrap(); + // Iterate over each HashMap entry to + // avoid capturing the HashMap in the + // closure passed to scan thus making + // conflicting read and write borrows. + candidates_bin + .iter_mut() + .for_each(|(candidate_pubkey, candidate_info)| { + self.accounts_index.scan( + [*candidate_pubkey].iter(), + |candidate_pubkey, slot_list_and_ref_count, _entry| { + let mut useless = true; + if let Some((slot_list, ref_count)) = + slot_list_and_ref_count + { + // find the highest rooted slot in the slot list + let index_in_slot_list = + self.accounts_index.latest_slot( + None, + slot_list, + max_clean_root_inclusive, ); - } else { - found_not_zero += 1; - } - if uncleaned_roots.contains(slot) { - // Assertion enforced by `accounts_index.get()`, the latest slot - // will not be greater than the given `max_clean_root` - if let Some(max_clean_root_inclusive) = - max_clean_root_inclusive - { - assert!(slot <= &max_clean_root_inclusive); + + match index_in_slot_list { + Some(index_in_slot_list) => { + // found info relative to max_clean_root + let (slot, account_info) = + &slot_list[index_in_slot_list]; + if account_info.is_zero_lamport() { + useless = false; + // The latest one is zero lamports. We may be able to purge it. + // Add all the rooted entries that contain this pubkey. + // We know the highest rooted entry is zero lamports. + candidate_info.slot_list = + self.accounts_index.get_rooted_entries( + slot_list, + max_clean_root_inclusive, + ); + candidate_info.ref_count = ref_count; + } else { + found_not_zero += 1; + } + if uncleaned_roots.contains(slot) { + // Assertion enforced by `accounts_index.get()`, the latest slot + // will not be greater than the given `max_clean_root` + if let Some(max_clean_root_inclusive) = + max_clean_root_inclusive + { + assert!( + slot <= &max_clean_root_inclusive + ); + } + if slot_list.len() > 1 { + // no need to purge old accounts if there is only 1 slot in the slot list + purges_old_accounts + .push(*candidate_pubkey); + useless = false; + } else { + self.clean_accounts_stats + .uncleaned_roots_slot_list_1 + .fetch_add(1, Ordering::Relaxed); + } + } } - if slot_list.len() > 1 { - // no need to purge old accounts if there is only 1 slot in the slot list - purges_old_accounts.push(*candidate); + None => { + // This pubkey is in the index but not in a root slot, so clean + // it up by adding it to the to-be-purged list. + // + // Also, this pubkey must have been touched by some slot since + // it was in the dirty list, so we assume that the slot it was + // touched in must be unrooted. + not_found_on_fork += 1; useless = false; - } else { - self.clean_accounts_stats - .uncleaned_roots_slot_list_1 - .fetch_add(1, Ordering::Relaxed); + purges_old_accounts.push(*candidate_pubkey); } } + } else { + missing += 1; } - None => { - // This pubkey is in the index but not in a root slot, so clean - // it up by adding it to the to-be-purged list. - // - // Also, this pubkey must have been touched by some slot since - // it was in the dirty list, so we assume that the slot it was - // touched in must be unrooted. - not_found_on_fork += 1; - useless = false; - purges_old_accounts.push(*candidate); + if !useless { + useful += 1; } - } - } else { - missing += 1; - } - if !useless { - useful += 1; - } - AccountsIndexScanResult::OnlyKeepInMemoryIfDirty - }, - None, - false, - ); + AccountsIndexScanResult::OnlyKeepInMemoryIfDirty + }, + None, + false, + ); + }); found_not_zero_accum.fetch_add(found_not_zero, Ordering::Relaxed); not_found_on_fork_accum.fetch_add(not_found_on_fork, Ordering::Relaxed); missing_accum.fetch_add(missing, Ordering::Relaxed); useful_accum.fetch_add(useful, Ordering::Relaxed); - (purges_zero_lamports, purges_old_accounts) + purges_old_accounts + }) + .reduce(Vec::new, |mut a, b| { + // Collapse down the vecs into one. + a.extend(b); + a }) - .reduce( - || (HashMap::new(), Vec::new()), - |mut a, b| { - // Collapse down the hashmaps/vecs into one. - a.0.extend(b.0); - a.1.extend(b.1); - a - }, - ) }; if is_startup { do_clean_scan() @@ -3360,84 +3404,105 @@ impl AccountsDb { // Calculate store counts as if everything was purged // Then purge if we can let mut store_counts: HashMap)> = HashMap::new(); - for (pubkey, (slot_list, ref_count)) in purges_zero_lamports.iter_mut() { - if purged_account_slots.contains_key(pubkey) { - *ref_count = self.accounts_index.ref_count_from_storage(pubkey); - } - slot_list.retain(|(slot, account_info)| { - let was_slot_purged = purged_account_slots - .get(pubkey) - .map(|slots_removed| slots_removed.contains(slot)) - .unwrap_or(false); - if was_slot_purged { - // No need to look up the slot storage below if the entire - // slot was purged - return false; + for candidates_bin in candidates.iter() { + for ( + pubkey, + CleaningInfo { + slot_list, + ref_count, + }, + ) in candidates_bin.write().unwrap().iter_mut() + { + if slot_list.is_empty() { + continue; // seems simpler than filtering. `candidates` contains all the pubkeys we original started with } - // Check if this update in `slot` to the account with `key` was reclaimed earlier by - // `clean_accounts_older_than_root()` - let was_reclaimed = removed_accounts - .get(slot) - .map(|store_removed| store_removed.contains(&account_info.offset())) - .unwrap_or(false); - if was_reclaimed { - return false; + if purged_account_slots.contains_key(pubkey) { + *ref_count = self.accounts_index.ref_count_from_storage(pubkey); } - if let Some(store_count) = store_counts.get_mut(slot) { - store_count.0 -= 1; - store_count.1.insert(*pubkey); - } else { - let mut key_set = HashSet::new(); - key_set.insert(*pubkey); - assert!( - !account_info.is_cached(), - "The Accounts Cache must be flushed first for this account info. pubkey: {}, slot: {}", - *pubkey, - *slot - ); - let count = self - .storage - .get_account_storage_entry(*slot, account_info.store_id()) - .map(|store| store.count()) - .unwrap() - - 1; - debug!( - "store_counts, inserting slot: {}, store id: {}, count: {}", - slot, account_info.store_id(), count - ); - store_counts.insert(*slot, (count, key_set)); - } - true - }); + slot_list.retain(|(slot, account_info)| { + let was_slot_purged = purged_account_slots + .get(pubkey) + .map(|slots_removed| slots_removed.contains(slot)) + .unwrap_or(false); + if was_slot_purged { + // No need to look up the slot storage below if the entire + // slot was purged + return false; + } + // Check if this update in `slot` to the account with `key` was reclaimed earlier by + // `clean_accounts_older_than_root()` + let was_reclaimed = removed_accounts + .get(slot) + .map(|store_removed| store_removed.contains(&account_info.offset())) + .unwrap_or(false); + if was_reclaimed { + return false; + } + if let Some(store_count) = store_counts.get_mut(slot) { + store_count.0 -= 1; + store_count.1.insert(*pubkey); + } else { + let mut key_set = HashSet::new(); + key_set.insert(*pubkey); + assert!( + !account_info.is_cached(), + "The Accounts Cache must be flushed first for this account info. pubkey: {}, slot: {}", + *pubkey, + *slot + ); + let count = self + .storage + .get_account_storage_entry(*slot, account_info.store_id()) + .map(|store| store.count()) + .unwrap() + - 1; + debug!( + "store_counts, inserting slot: {}, store id: {}, count: {}", + slot, account_info.store_id(), count + ); + store_counts.insert(*slot, (count, key_set)); + } + true + }); + } } store_counts_time.stop(); let mut calc_deps_time = Measure::start("calc_deps"); - Self::calc_delete_dependencies(&purges_zero_lamports, &mut store_counts, min_dirty_slot); + self.calc_delete_dependencies(&candidates, &mut store_counts, min_dirty_slot); calc_deps_time.stop(); let mut purge_filter = Measure::start("purge_filter"); self.filter_zero_lamport_clean_for_incremental_snapshots( max_clean_root_inclusive, &store_counts, - &mut purges_zero_lamports, + &candidates, ); purge_filter.stop(); let mut reclaims_time = Measure::start("reclaims"); // Recalculate reclaims with new purge set - let pubkey_to_slot_set: Vec<_> = purges_zero_lamports - .into_iter() - .map(|(key, (slots_list, _ref_count))| { - ( - key, - slots_list - .into_iter() - .map(|(slot, _)| slot) - .collect::>(), - ) - }) - .collect(); + let mut pubkey_to_slot_set = Vec::new(); + for candidates_bin in candidates.iter() { + let candidates_bin = candidates_bin.read().unwrap(); + let mut bin_set = candidates_bin + .iter() + .filter_map(|(pubkey, cleaning_info)| { + let CleaningInfo { + slot_list, + ref_count: _, + } = cleaning_info; + (!slot_list.is_empty()).then_some(( + *pubkey, + slot_list + .iter() + .map(|(slot, _)| *slot) + .collect::>(), + )) + }) + .collect::>(); + pubkey_to_slot_set.append(&mut bin_set); + } let (reclaims, pubkeys_removed_from_accounts_index2) = self.purge_keys_exact(pubkey_to_slot_set.iter()); @@ -3491,7 +3556,6 @@ impl AccountsDb { ("delta_insert_us", key_timings.delta_insert_us, i64), ("delta_key_count", key_timings.delta_key_count, i64), ("dirty_pubkeys_count", key_timings.dirty_pubkeys_count, i64), - ("sort_us", sort.as_us(), i64), ("useful_keys", useful_accum.load(Ordering::Relaxed), i64), ("total_keys_count", num_candidates, i64), ( @@ -3661,7 +3725,9 @@ impl AccountsDb { } /// During clean, some zero-lamport accounts that are marked for purge should *not* actually - /// get purged. Filter out those accounts here by removing them from 'purges_zero_lamports' + /// get purged. Filter out those accounts here by removing them from 'candidates'. + /// Candidates may contain entries with empty slots list in CleaningInfo. + /// The function removes such entries from 'candidates'. /// /// When using incremental snapshots, do not purge zero-lamport accounts if the slot is higher /// than the latest full snapshot slot. This is to protect against the following scenario: @@ -3685,7 +3751,7 @@ impl AccountsDb { &self, max_clean_root_inclusive: Option, store_counts: &HashMap)>, - purges_zero_lamports: &mut HashMap, RefCount)>, + candidates: &[RwLock>], ) { let latest_full_snapshot_slot = self.latest_full_snapshot_slot(); let should_filter_for_incremental_snapshots = max_clean_root_inclusive.unwrap_or(Slot::MAX) @@ -3695,31 +3761,42 @@ impl AccountsDb { "if filtering for incremental snapshots, then snapshots should be enabled", ); - purges_zero_lamports.retain(|pubkey, (slot_account_infos, _ref_count)| { - // Only keep purges_zero_lamports where the entire history of the account in the root set - // can be purged. All AppendVecs for those updates are dead. - for (slot, _account_info) in slot_account_infos.iter() { - if let Some(store_count) = store_counts.get(slot) { - if store_count.0 != 0 { - // one store this pubkey is in is not being removed, so this pubkey cannot be removed at all + for bin in candidates { + let mut bin = bin.write().unwrap(); + bin.retain(|pubkey, cleaning_info| { + let CleaningInfo { + slot_list, + ref_count: _, + } = cleaning_info; + if slot_list.is_empty() { + return false; + } + // Only keep candidates where the entire history of the account in the root set + // can be purged. All AppendVecs for those updates are dead. + for (slot, _account_info) in slot_list.iter() { + if let Some(store_count) = store_counts.get(slot) { + if store_count.0 != 0 { + // one store this pubkey is in is not being removed, so this pubkey cannot be removed at all + return false; + } + } else { + // store is not being removed, so this pubkey cannot be removed at all return false; } - } else { - // store is not being removed, so this pubkey cannot be removed at all - return false; } - } - // Exit early if not filtering more for incremental snapshots - if !should_filter_for_incremental_snapshots { - return true; - } + // Exit early if not filtering more for incremental snapshots + if !should_filter_for_incremental_snapshots { + return true; + } - let slot_account_info_at_highest_slot = slot_account_infos - .iter() - .max_by_key(|(slot, _account_info)| slot); + // Safety: We exited early if the slot list was empty, + // so we're guaranteed here that `.max_by_key()` returns Some. + let (slot, account_info) = slot_list + .iter() + .max_by_key(|(slot, _account_info)| slot) + .unwrap(); - slot_account_info_at_highest_slot.map_or(true, |(slot, account_info)| { // Do *not* purge zero-lamport accounts if the slot is greater than the last full // snapshot slot. Since we're `retain`ing the accounts-to-purge, I felt creating // the `cannot_purge` variable made this easier to understand. Accounts that do @@ -3732,8 +3809,8 @@ impl AccountsDb { .insert((*slot, *pubkey)); } !cannot_purge - }) - }); + }); + } } // Must be kept private!, does sensitive cleanup that should only be called from @@ -12454,18 +12531,40 @@ pub mod tests { accounts_index.add_root(1); accounts_index.add_root(2); accounts_index.add_root(3); - let mut purges = HashMap::new(); + let num_bins = accounts_index.bins(); + let candidates: Box<_> = + std::iter::repeat_with(|| RwLock::new(HashMap::::new())) + .take(num_bins) + .collect(); for key in [&key0, &key1, &key2] { let index_entry = accounts_index.get_cloned(key).unwrap(); let rooted_entries = accounts_index .get_rooted_entries(index_entry.slot_list.read().unwrap().as_slice(), None); let ref_count = index_entry.ref_count(); - purges.insert(*key, (rooted_entries, ref_count)); + let index = accounts_index.bin_calculator.bin_from_pubkey(key); + let mut candidates_bin = candidates[index].write().unwrap(); + candidates_bin.insert( + *key, + CleaningInfo { + slot_list: rooted_entries, + ref_count, + }, + ); } - for (key, (list, ref_count)) in &purges { - info!(" purge {} ref_count {} =>", key, ref_count); - for x in list { - info!(" {:?}", x); + for candidates_bin in candidates.iter() { + let candidates_bin = candidates_bin.read().unwrap(); + for ( + key, + CleaningInfo { + slot_list: list, + ref_count, + }, + ) in candidates_bin.iter() + { + info!(" purge {} ref_count {} =>", key, ref_count); + for x in list { + info!(" {:?}", x); + } } } @@ -12474,7 +12573,8 @@ pub mod tests { store_counts.insert(1, (0, HashSet::from_iter(vec![key0, key1]))); store_counts.insert(2, (0, HashSet::from_iter(vec![key1, key2]))); store_counts.insert(3, (1, HashSet::from_iter(vec![key2]))); - AccountsDb::calc_delete_dependencies(&purges, &mut store_counts, None); + let accounts = AccountsDb::new_single_for_tests(); + accounts.calc_delete_dependencies(&candidates, &mut store_counts, None); let mut stores: Vec<_> = store_counts.keys().cloned().collect(); stores.sort_unstable(); for store in &stores { @@ -14756,9 +14856,14 @@ pub mod tests { let store_count = 0; let mut store_counts = HashMap::default(); store_counts.insert(slot, (store_count, key_set)); - let mut purges_zero_lamports = HashMap::default(); - purges_zero_lamports.insert(pubkey, (vec![(slot, account_info)], 1)); - + let candidates = [RwLock::new(HashMap::new())]; + candidates[0].write().unwrap().insert( + pubkey, + CleaningInfo { + slot_list: vec![(slot, account_info)], + ref_count: 1, + }, + ); let accounts_db = AccountsDb::new_single_for_tests(); if let Some(latest_full_snapshot_slot) = test_params.latest_full_snapshot_slot { accounts_db.set_latest_full_snapshot_slot(latest_full_snapshot_slot); @@ -14766,11 +14871,11 @@ pub mod tests { accounts_db.filter_zero_lamport_clean_for_incremental_snapshots( test_params.max_clean_root, &store_counts, - &mut purges_zero_lamports, + &candidates, ); assert_eq!( - purges_zero_lamports.contains_key(&pubkey), + candidates[0].read().unwrap().contains_key(&pubkey), test_params.should_contain ); };