Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shrink/ancient pack purge zero lamport accounts #2312

Merged
merged 5 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
pub(crate) slot: Slot,
pub(crate) capacity: u64,
pub(crate) unrefed_pubkeys: Vec<&'a Pubkey>,
pub(crate) zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
pub(crate) alive_accounts: T,
/// total size in storage of all alive accounts
pub(crate) alive_total_bytes: usize,
Expand Down Expand Up @@ -524,6 +525,8 @@ struct LoadAccountsIndexForShrink<'a, T: ShrinkCollectRefs<'a>> {
alive_accounts: T,
/// pubkeys that were unref'd in the accounts index because they were dead
unrefed_pubkeys: Vec<&'a Pubkey>,
/// pubkeys that are the last remaining zero lamport instance of an account
zero_lamport_single_ref_pubkeys: Vec<&'a Pubkey>,
/// true if all alive accounts are zero lamport accounts
all_are_zero_lamports: bool,
/// index entries we need to hold onto to keep them from getting flushed
Expand Down Expand Up @@ -2011,6 +2014,7 @@ pub struct ShrinkStats {
dead_accounts: AtomicU64,
alive_accounts: AtomicU64,
accounts_loaded: AtomicU64,
purged_zero_lamports: AtomicU64,
}

impl ShrinkStats {
Expand Down Expand Up @@ -2109,6 +2113,11 @@ impl ShrinkStats {
self.accounts_loaded.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"purged_zero_lamports_count",
self.purged_zero_lamports.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down Expand Up @@ -2309,6 +2318,13 @@ impl ShrinkAncientStats {
self.many_refs_old_alive.swap(0, Ordering::Relaxed),
i64
),
(
"purged_zero_lamports_count",
self.shrink_stats
.purged_zero_lamports
.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down Expand Up @@ -3793,12 +3809,14 @@ impl AccountsDb {
let count = accounts.len();
let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
let mut unrefed_pubkeys = Vec::with_capacity(count);
let mut zero_lamport_single_ref_pubkeys = Vec::with_capacity(count);

let mut alive = 0;
let mut dead = 0;
let mut index = 0;
let mut all_are_zero_lamports = true;
let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len());
let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
|pubkey, slots_refs, entry| {
Expand All @@ -3817,6 +3835,21 @@ impl AccountsDb {
unrefed_pubkeys.push(pubkey);
result = AccountsIndexScanResult::Unref;
dead += 1;
} else if stored_account.is_zero_lamport()
&& ref_count == 1
&& latest_full_snapshot_slot
.map(|latest_full_snapshot_slot| {
latest_full_snapshot_slot >= slot_to_shrink
})
.unwrap_or(true)
{
// only do this if our slot is prior to the latest full snapshot
// we found a zero lamport account that is the only instance of this account. We can delete it completely.
zero_lamport_single_ref_pubkeys.push(pubkey);
self.add_uncleaned_pubkeys_after_shrink(
slot_to_shrink,
[*pubkey].into_iter(),
);
} else {
// Hold onto the index entry arc so that it cannot be flushed.
// Since we are shrinking these entries, we need to disambiguate storage ids during this period and those only exist in the in-memory accounts index.
Expand All @@ -3839,6 +3872,7 @@ impl AccountsDb {
LoadAccountsIndexForShrink {
alive_accounts,
unrefed_pubkeys,
zero_lamport_single_ref_pubkeys,
all_are_zero_lamports,
index_entries_being_shrunk,
}
Expand Down Expand Up @@ -3939,6 +3973,7 @@ impl AccountsDb {
let len = stored_accounts.len();
let alive_accounts_collect = Mutex::new(T::with_capacity(len, slot));
let unrefed_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
let zero_lamport_single_ref_pubkeys_collect = Mutex::new(Vec::with_capacity(len));
stats
.accounts_loaded
.fetch_add(len as u64, Ordering::Relaxed);
Expand All @@ -3955,6 +3990,7 @@ impl AccountsDb {
alive_accounts,
mut unrefed_pubkeys,
all_are_zero_lamports,
mut zero_lamport_single_ref_pubkeys,
mut index_entries_being_shrunk,
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);

Expand All @@ -3967,6 +4003,10 @@ impl AccountsDb {
.lock()
.unwrap()
.append(&mut unrefed_pubkeys);
zero_lamport_single_ref_pubkeys_collect
.lock()
.unwrap()
.append(&mut zero_lamport_single_ref_pubkeys);
index_entries_being_shrunk_outer
.lock()
.unwrap()
Expand All @@ -3979,6 +4019,9 @@ impl AccountsDb {

let alive_accounts = alive_accounts_collect.into_inner().unwrap();
let unrefed_pubkeys = unrefed_pubkeys_collect.into_inner().unwrap();
let zero_lamport_single_ref_pubkeys = zero_lamport_single_ref_pubkeys_collect
.into_inner()
.unwrap();

index_read_elapsed.stop();
stats
Expand All @@ -4002,6 +4045,7 @@ impl AccountsDb {
slot,
capacity: *capacity,
unrefed_pubkeys,
zero_lamport_single_ref_pubkeys,
alive_accounts,
alive_total_bytes,
total_starting_accounts: len,
Expand All @@ -4010,6 +4054,41 @@ impl AccountsDb {
}
}

/// These accounts were found during shrink of `slot` to be slot_list=[slot] and ref_count == 1 and lamports = 0.
/// This means this slot contained the only account data for this pubkey and it is zero lamport.
/// Thus, we did NOT treat this as an alive account, so we did NOT copy the zero lamport accoutn to the new
/// storage. So, the account will no longer be alive or exist at `slot`.
/// So, first, remove the ref count since this newly shrunk storage will no longer access it.
/// Second, remove `slot` from the index entry's slot list. If the slot list is now empty, then the
/// pubkey can be removed completely from the index.
/// In parallel with this code (which is running in the bg), the same pubkey could be revived and written to
/// as part of tx processing. In that case, the slot list will contain a slot in the write cache and the
/// index entry will NOT be deleted.
fn remove_zero_lamport_single_ref_accounts_after_shrink(
&self,
zero_lamport_single_ref_pubkeys: &[&Pubkey],
slot: Slot,
stats: &ShrinkStats,
) {
stats.purged_zero_lamports.fetch_add(
zero_lamport_single_ref_pubkeys.len() as u64,
Ordering::Relaxed,
);

// we have to unref before we `purge_keys_exact`. Otherwise, we could race with the foreground with tx processing
// reviving this index entry and then we'd unref the revived version, which is a refcount bug.
self.accounts_index.scan(
zero_lamport_single_ref_pubkeys.iter().cloned(),
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
|_pubkey, _slots_refs, _entry| AccountsIndexScanResult::Unref,
Some(AccountsIndexScanResult::Unref),
false,
);

zero_lamport_single_ref_pubkeys.iter().for_each(|k| {
_ = self.purge_keys_exact([&(**k, slot)].into_iter());
});
}

/// common code from shrink and combine_ancient_slots
/// get rid of all original store_ids in the slot
pub(crate) fn remove_old_stores_shrink<'a, T: ShrinkCollectRefs<'a>>(
Expand All @@ -4020,7 +4099,19 @@ impl AccountsDb {
shrink_can_be_active: bool,
) {
let mut time = Measure::start("remove_old_stores_shrink");

// handle the zero lamport alive accounts before calling clean
// We have to update the index entries for these zero lamport pubkeys before we remove the storage in `mark_dirty_dead_stores`
// that contained the accounts.
self.remove_zero_lamport_single_ref_accounts_after_shrink(
&shrink_collect.zero_lamport_single_ref_pubkeys,
shrink_collect.slot,
stats,
);

// Purge old, overwritten storage entries
// This has the side effect of dropping `shrink_in_progress`, which removes the old storage completely. The
// index has to be correct before we drop the old storage.
let dead_storages = self.mark_dirty_dead_stores(
shrink_collect.slot,
// If all accounts are zero lamports, then we want to mark the entire OLD append vec as dirty.
Expand Down Expand Up @@ -10882,6 +10973,146 @@ pub mod tests {
assert_eq!(accounts.alive_account_count_in_slot(1), 0);
}

#[test]
fn test_remove_zero_lamport_single_ref_accounts_after_shrink() {
for pass in 0..3 {
let accounts = AccountsDb::new_single_for_tests();
let pubkey_zero = Pubkey::from([1; 32]);
let pubkey2 = Pubkey::from([2; 32]);
let account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
let zero_lamport_account =
AccountSharedData::new(0, 0, AccountSharedData::default().owner());
let slot = 1;

accounts.store_for_tests(
slot,
&[(&pubkey_zero, &zero_lamport_account), (&pubkey2, &account)],
);

// Simulate rooting the zero-lamport account, writes it to storage
accounts.calculate_accounts_delta_hash(slot);
accounts.add_root_and_flush_write_cache(slot);

if pass > 0 {
// store in write cache
accounts.store_for_tests(slot + 1, &[(&pubkey_zero, &zero_lamport_account)]);
if pass == 2 {
// move to a storage (causing ref count to increase)
accounts.calculate_accounts_delta_hash(slot + 1);
accounts.add_root_and_flush_write_cache(slot + 1);
}
}

accounts.accounts_index.get_and_then(&pubkey_zero, |entry| {
let expected_ref_count = if pass < 2 { 1 } else { 2 };
assert_eq!(entry.unwrap().ref_count(), expected_ref_count, "{pass}");
let expected_slot_list = if pass < 1 { 1 } else { 2 };
assert_eq!(
entry.unwrap().slot_list.read().unwrap().len(),
expected_slot_list
);
(false, ())
});
accounts.accounts_index.get_and_then(&pubkey2, |entry| {
assert!(entry.is_some());
(false, ())
});

let zero_lamport_single_ref_pubkeys = [&pubkey_zero];
accounts.remove_zero_lamport_single_ref_accounts_after_shrink(
&zero_lamport_single_ref_pubkeys,
slot,
&ShrinkStats::default(),
);

accounts.accounts_index.get_and_then(&pubkey_zero, |entry| {
if pass == 0 {
// should not exist in index at all
assert!(entry.is_none(), "{pass}");
} else {
// alive only in slot + 1
assert_eq!(entry.unwrap().slot_list.read().unwrap().len(), 1);
assert_eq!(
entry
.unwrap()
.slot_list
.read()
.unwrap()
.first()
.map(|(s, _)| s)
.cloned()
.unwrap(),
slot + 1
);
// refcount = 1 if we flushed the write cache for slot + 1
let expected_ref_count = if pass < 2 { 0 } else { 1 };
assert_eq!(
entry.map(|e| e.ref_count()),
Some(expected_ref_count),
"{pass}"
);
}
(false, ())
});

accounts.accounts_index.get_and_then(&pubkey2, |entry| {
assert!(entry.is_some(), "{pass}");
(false, ())
});
}
}

#[test]
fn test_shrink_zero_lamport_single_ref_account() {
solana_logger::setup();

// store a zero and non-zero lamport account
// make sure clean marks the ref_count=1, zero lamport account dead and removes pubkey from index completely
let accounts = AccountsDb::new_single_for_tests();
let pubkey_zero = Pubkey::from([1; 32]);
let pubkey2 = Pubkey::from([2; 32]);
let account = AccountSharedData::new(1, 0, AccountSharedData::default().owner());
let zero_lamport_account =
AccountSharedData::new(0, 0, AccountSharedData::default().owner());

// Store a zero-lamport account and a non-zero lamport account
accounts.store_for_tests(
1,
&[(&pubkey_zero, &zero_lamport_account), (&pubkey2, &account)],
);

// Simulate rooting the zero-lamport account, should be a
// candidate for cleaning
accounts.calculate_accounts_delta_hash(1);
accounts.add_root_and_flush_write_cache(1);

// for testing, we need to cause shrink to think this will be productive.
// The zero lamport account isn't dead, but it can become dead inside shrink.
accounts
.storage
.get_slot_storage_entry(1)
.unwrap()
.alive_bytes
.fetch_sub(aligned_stored_size(0), Ordering::Relaxed);

// Slot 1 should be cleaned, but
// zero-lamport account should not be cleaned since last full snapshot root is before slot 1
HaoranYi marked this conversation as resolved.
Show resolved Hide resolved
accounts.shrink_slot_forced(1);

assert!(accounts.storage.get_slot_storage_entry(1).is_some());

// the zero lamport account should be marked as dead
assert_eq!(accounts.alive_account_count_in_slot(1), 1);

// zero lamport account should be dead in the index
assert!(!accounts
.accounts_index
.contains_with(&pubkey_zero, None, None));
// other account should still be alive
assert!(accounts.accounts_index.contains_with(&pubkey2, None, None));
assert!(accounts.storage.get_slot_storage_entry(1).is_some());
}

#[test]
fn test_clean_multiple_zero_lamport_decrements_index_ref_count() {
solana_logger::setup();
Expand Down Expand Up @@ -15908,6 +16139,8 @@ pub mod tests {
debug!("space: {space}, lamports: {lamports}, alive: {alive}, account_count: {account_count}, append_opposite_alive_account: {append_opposite_alive_account}, append_opposite_zero_lamport_account: {append_opposite_zero_lamport_account}, normal_account_count: {normal_account_count}");
let db = AccountsDb::new_single_for_tests();
let slot5 = 5;
// don't do special zero lamport account handling
db.set_latest_full_snapshot_slot(0);
brooksprumo marked this conversation as resolved.
Show resolved Hide resolved
let mut account = AccountSharedData::new(
lamports,
space,
Expand Down
1 change: 1 addition & 0 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3836,6 +3836,7 @@ pub mod tests {
unrefed_pubkeys: unrefed_pubkeys.iter().collect(),

// irrelevant fields
zero_lamport_single_ref_pubkeys: Vec::default(),
slot: 0,
capacity: 0,
alive_accounts: ShrinkCollectAliveSeparatedByRefs {
Expand Down
Loading