Skip to content

Commit

Permalink
shrink/ancient pack purge zero lamport accounts (#2312)
Browse files Browse the repository at this point in the history
* shrink/ancient pack purge zero lamport accounts

* pr feedback

* add =

* comment

* fix comment typo
  • Loading branch information
jeffwashington authored Aug 6, 2024
1 parent beb3f58 commit c84900d
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 0 deletions.
233 changes: 233 additions & 0 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
pub(crate) slot: Slot,
pub(crate) capacity: u64,
pub(crate) unrefed_pubkeys: Vec<&'a Pubkey>,
pub(crate) zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
pub(crate) alive_accounts: T,
/// total size in storage of all alive accounts
pub(crate) alive_total_bytes: usize,
Expand Down Expand Up @@ -524,6 +525,8 @@ struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> {
alive_accounts: T,
/// pubkeys that were unref'd in the accounts index because they were dead
unrefed_pubkeys: Vec<&'a Pubkey>,
/// pubkeys that are the last remaining zero lamport instance of an account
zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
/// true if all alive accounts are zero lamport accounts
all_are_zero_lamports: bool,
/// index entries we need to hold onto to keep them from getting flushed
Expand Down Expand Up @@ -2011,6 +2014,7 @@ pub struct ShrinkStats {
dead_accounts: AtomicU64,
alive_accounts: AtomicU64,
accounts_loaded: AtomicU64,
purged_zero_lamports: AtomicU64,
}

impl ShrinkStats {
Expand Down Expand Up @@ -2109,6 +2113,11 @@ impl ShrinkStats {
self.accounts_loaded.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"purged_zero_lamports_count",
self.purged_zero_lamports.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down Expand Up @@ -2309,6 +2318,13 @@ impl ShrinkAncientStats {
self.many_refs_old_alive.swap(0, Ordering::Relaxed),
i64
),
(
"purged_zero_lamports_count",
self.shrink_stats
.purged_zero_lamports
.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down Expand Up @@ -3793,12 +3809,14 @@ impl AccountsDb {
let count = accounts.len();
let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
let mut unrefed_pubkeys = Vec::with_capacity(count);
let mut zero_lamport_single_ref_pubkeys = Vec::with_capacity(count);

let mut alive = 0;
let mut dead = 0;
let mut index = 0;
let mut all_are_zero_lamports = true;
let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len());
let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
|pubkey, slots_refs, entry| {
Expand All @@ -3817,6 +3835,21 @@ impl AccountsDb {
unrefed_pubkeys.push(pubkey);
result = AccountsIndexScanResult::Unref;
dead += 1;
} else if stored_account.is_zero_lamport()
&& ref_count == 1
&& latest_full_snapshot_slot
.map(|latest_full_snapshot_slot| {
latest_full_snapshot_slot >= slot_to_shrink
})
.unwrap_or(true)
{
// only do this if our slot is prior to the latest full snapshot
// we found a zero lamport account that is the only instance of this account. We can delete it completely.
zero_lamport_single_ref_pubkeys.push(pubkey);
self.add_uncleaned_pubkeys_after_shrink(
slot_to_shrink,
[*pubkey].into_iter(),
);
} else {
// Hold onto the index entry arc so that it cannot be flushed.
// Since we are shrinking these entries, we need to disambiguate storage ids during this period and those only exist in the in-memory accounts index.
Expand All @@ -3839,6 +3872,7 @@ impl AccountsDb {
LoadAccountsIndexForShrink {
alive_accounts,
unrefed_pubkeys,
zero_lamport_single_ref_pubkeys,
all_are_zero_lamports,
index_entries_being_shrunk,
}
Expand Down Expand Up @@ -3939,6 +3973,7 @@ impl AccountsDb {
let len = stored_accounts.len();
let alive_accounts_collect = Mutex::new(T::with_capacity(len, slot));
let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
let zero_lamport_single_ref_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
stats
.accounts_loaded
.fetch_add(len as u64, Ordering::Relaxed);
Expand All @@ -3955,6 +3990,7 @@ impl AccountsDb {
alive_accounts,
mut unrefed_pubkeys,
all_are_zero_lamports,
mut zero_lamport_single_ref_pubkeys,
mut index_entries_being_shrunk,
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);

Expand All @@ -3967,6 +4003,10 @@ impl AccountsDb {
.lock()
.unwrap()
.append(&mut unrefed_pubkeys);
zero_lamport_single_ref_pubkeys_collect
.lock()
.unwrap()
.append(&mut zero_lamport_single_ref_pubkeys);
index_entries_being_shrunk_outer
.lock()
.unwrap()
Expand All @@ -3979,6 +4019,9 @@ impl AccountsDb {

let alive_accounts = alive_accounts_collect.into_inner().unwrap();
let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap();
let zero_lamport_single_ref_pubkeys = zero_lamport_single_ref_pubkeys_collect
.into_inner()
.unwrap();

index_read_elapsed.stop();
stats
Expand All @@ -4002,6 +4045,7 @@ impl AccountsDb {
slot,
capacity: *capacity,
unrefed_pubkeys,
zero_lamport_single_ref_pubkeys,
alive_accounts,
alive_total_bytes,
total_starting_accounts: len,
Expand All @@ -4010,6 +4054,41 @@ impl AccountsDb {
}
}

/// These accounts were found during shrink of `slot` to be slot_list=[slot] and ref_count == 1 and lamports = 0.
/// This means this slot contained the only account data for this pubkey and it is zero lamport.
/// Thus, we did NOT treat this as an alive account, so we did NOT copy the zero lamport account to the new
/// storage. So, the account will no longer be alive or exist at `slot`.
/// So, first, remove the ref count since this newly shrunk storage will no longer access it.
/// Second, remove `slot` from the index entry's slot list. If the slot list is now empty, then the
/// pubkey can be removed completely from the index.
/// In parallel with this code (which is running in the bg), the same pubkey could be revived and written to
/// as part of tx processing. In that case, the slot list will contain a slot in the write cache and the
/// index entry will NOT be deleted.
fn remove_zero_lamport_single_ref_accounts_after_shrink(
&self,
zero_lamport_single_ref_pubkeys: &[&Pubkey],
slot: Slot,
stats: &ShrinkStats,
) {
stats.purged_zero_lamports.fetch_add(
zero_lamport_single_ref_pubkeys.len() as u64,
Ordering::Relaxed,
);

// we have to unref before we `purge_keys_exact`. Otherwise, we could race with the foreground with tx processing
// reviving this index entry and then we'd unref the revived version, which is a refcount bug.
self.accounts_index.scan(
zero_lamport_single_ref_pubkeys.iter().cloned(),
|_pubkey, _slots_refs, _entry| AccountsIndexScanResult::Unref,
Some(AccountsIndexScanResult::Unref),
false,
);

zero_lamport_single_ref_pubkeys.iter().for_each(|k| {
_ = self.purge_keys_exact([&(**k, slot)].into_iter());
});
}

/// common code from shrink and combine_ancient_slots
/// get rid of all original store_ids in the slot
pub(crate) fn remove_old_stores_shrink<'a, T: ShrinkCollectRefs<'a>>(
Expand All @@ -4020,7 +4099,19 @@ impl AccountsDb {
shrink_can_be_active: bool,
) {
let mut time = Measure::start("remove_old_stores_shrink");

// handle the zero lamport alive accounts before calling clean
// We have to update the index entries for these zero lamport pubkeys before we remove the storage in `mark_dirty_dead_stores`
// that contained the accounts.
self.remove_zero_lamport_single_ref_accounts_after_shrink(
&shrink_collect.zero_lamport_single_ref_pubkeys,
shrink_collect.slot,
stats,
);

// Purge old, overwritten storage entries
// This has the side effect of dropping `shrink_in_progress`, which removes the old storage completely. The
// index has to be correct before we drop the old storage.
let dead_storages = self.mark_dirty_dead_stores(
shrink_collect.slot,
// If all accounts are zero lamports, then we want to mark the entire OLD append vec as dirty.
Expand Down Expand Up @@ -10884,6 +10975,146 @@ pub mod tests {
assert_eq!(accounts.alive_account_count_in_slot(1), 0);
}

#[test]
fn test_remove_zero_lamport_single_ref_accounts_after_shrink() {
for pass in 0..3 {
let accounts = AccountsDb::new_single_for_tests();
let pubkey_zero = Pubkey::from([1; 32]);
let pubkey2 = Pubkey::from([2; 32]);
let account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
let zero_lamport_account =
AccountSharedData::new(0, 0, AccountSharedData::default().owner());
let slot = 1;

accounts.store_for_tests(
slot,
&[(&pubkey_zero, &zero_lamport_account), (&pubkey2, &account)],
);

// Simulate rooting the zero-lamport account, writes it to storage
accounts.calculate_accounts_delta_hash(slot);
accounts.add_root_and_flush_write_cache(slot);

if pass > 0 {
// store in write cache
accounts.store_for_tests(slot + 1, &[(&pubkey_zero, &zero_lamport_account)]);
if pass == 2 {
// move to a storage (causing ref count to increase)
accounts.calculate_accounts_delta_hash(slot + 1);
accounts.add_root_and_flush_write_cache(slot + 1);
}
}

accounts.accounts_index.get_and_then(&pubkey_zero, |entry| {
let expected_ref_count = if pass < 2 { 1 } else { 2 };
assert_eq!(entry.unwrap().ref_count(), expected_ref_count, "{pass}");
let expected_slot_list = if pass < 1 { 1 } else { 2 };
assert_eq!(
entry.unwrap().slot_list.read().unwrap().len(),
expected_slot_list
);
(false, ())
});
accounts.accounts_index.get_and_then(&pubkey2, |entry| {
assert!(entry.is_some());
(false, ())
});

let zero_lamport_single_ref_pubkeys = [&pubkey_zero];
accounts.remove_zero_lamport_single_ref_accounts_after_shrink(
&zero_lamport_single_ref_pubkeys,
slot,
&ShrinkStats::default(),
);

accounts.accounts_index.get_and_then(&pubkey_zero, |entry| {
if pass == 0 {
// should not exist in index at all
assert!(entry.is_none(), "{pass}");
} else {
// alive only in slot + 1
assert_eq!(entry.unwrap().slot_list.read().unwrap().len(), 1);
assert_eq!(
entry
.unwrap()
.slot_list
.read()
.unwrap()
.first()
.map(|(s, _)| s)
.cloned()
.unwrap(),
slot + 1
);
// refcount = 1 if we flushed the write cache for slot + 1
let expected_ref_count = if pass < 2 { 0 } else { 1 };
assert_eq!(
entry.map(|e| e.ref_count()),
Some(expected_ref_count),
"{pass}"
);
}
(false, ())
});

accounts.accounts_index.get_and_then(&pubkey2, |entry| {
assert!(entry.is_some(), "{pass}");
(false, ())
});
}
}

#[test]
fn test_shrink_zero_lamport_single_ref_account() {
solana_logger::setup();

// store a zero and non-zero lamport account
// make sure clean marks the ref_count=1, zero lamport account dead and removes pubkey from index completely
let accounts = AccountsDb::new_single_for_tests();
let pubkey_zero = Pubkey::from([1; 32]);
let pubkey2 = Pubkey::from([2; 32]);
let account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
let zero_lamport_account =
AccountSharedData::new(0, 0, AccountSharedData::default().owner());

// Store a zero-lamport account and a non-zero lamport account
accounts.store_for_tests(
1,
&[(&pubkey_zero, &zero_lamport_account), (&pubkey2, &account)],
);

// Simulate rooting the zero-lamport account, should be a
// candidate for cleaning
accounts.calculate_accounts_delta_hash(1);
accounts.add_root_and_flush_write_cache(1);

// for testing, we need to cause shrink to think this will be productive.
// The zero lamport account isn't dead, but it can become dead inside shrink.
accounts
.storage
.get_slot_storage_entry(1)
.unwrap()
.alive_bytes
.fetch_sub(aligned_stored_size(0), Ordering::Relaxed);

// Slot 1 should be cleaned, but
// zero-lamport account should not be cleaned since last full snapshot root is before slot 1
accounts.shrink_slot_forced(1);

assert!(accounts.storage.get_slot_storage_entry(1).is_some());

// the zero lamport account should be marked as dead
assert_eq!(accounts.alive_account_count_in_slot(1), 1);

// zero lamport account should be dead in the index
assert!(!accounts
.accounts_index
.contains_with(&pubkey_zero, None, None));
// other account should still be alive
assert!(accounts.accounts_index.contains_with(&pubkey2, None, None));
assert!(accounts.storage.get_slot_storage_entry(1).is_some());
}

#[test]
fn test_clean_multiple_zero_lamport_decrements_index_ref_count() {
solana_logger::setup();
Expand Down Expand Up @@ -15910,6 +16141,8 @@ pub mod tests {
debug!("space: {space}, lamports: {lamports}, alive: {alive}, account_count: {account_count}, append_opposite_alive_account: {append_opposite_alive_account}, append_opposite_zero_lamport_account: {append_opposite_zero_lamport_account}, normal_account_count: {normal_account_count}");
let db = AccountsDb::new_single_for_tests();
let slot5 = 5;
// don't do special zero lamport account handling
db.set_latest_full_snapshot_slot(0);
let mut account = AccountSharedData::new(
lamports,
space,
Expand Down
1 change: 1 addition & 0 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3836,6 +3836,7 @@ pub mod tests {
unrefed_pubkeys: unrefed_pubkeys.iter().collect(),

// irrelevant fields
zero_lamport_single_ref_pubkeys: Vec::default(),
slot: 0,
capacity: 0,
alive_accounts: ShrinkCollectAliveSeparatedByRefs {
Expand Down

0 comments on commit c84900d

Please sign in to comment.