Skip to content

Commit

Permalink
fix(consensus): garbage collection with not enough epochs
Browse files Browse the repository at this point in the history
  • Loading branch information
snormore committed May 16, 2024
1 parent 1a2d61c commit 7e2a51a
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6080,6 +6080,7 @@ dependencies = [
"resolved-pathbuf",
"serde",
"sui-protocol-config",
"tempdir",
"tokio",
"tonic 0.8.3",
"tracing",
Expand Down
3 changes: 3 additions & 0 deletions core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ prometheus = "0.13.3"
multiaddr = "0.17.1"
fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "c961a01596a87e76f590c7e43aca9d57106dbbb1" }
workspace-hack = { version = "0.1", path = "../../etc/workspace-hack" }

[dev-dependencies]
tempdir.workspace = true

This comment has been minimized.

Copy link
@ozwaldorf

ozwaldorf May 16, 2024

Member

Nice utility! Haven't seen this before

124 changes: 94 additions & 30 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl<Q: SyncQueryRunnerInterface, P: PubSub<PubSubMsg> + 'static, NE: Emitter>
let mut store_path = self.store_path.to_path_buf();

// Delete any directories that are from more than 2 epochs back
garbage_collect_old_stores(&current_epoch, &store_path);
garbage_collect_old_stores(&current_epoch, &store_path, 2);

store_path.push(format!("{current_epoch}"));
// TODO(dalton): This store takes an optional cache metrics struct that can give us metrics
Expand Down Expand Up @@ -438,9 +438,12 @@ impl<C: Collection> Consensus<C> {
}
}

/// Delete any epoch directories that are more than 2 epochs old
/// We dont wnat to panic if this fail but we should print an error
fn garbage_collect_old_stores(current_epoch: &u64, store_location: &PathBuf) {
/// Delete any epoch directories that are more than `retention` epochs old
/// We dont want to panic if this fails but we should print an error
fn garbage_collect_old_stores(current_epoch: &u64, store_location: &PathBuf, retention: u64) {
if current_epoch < &retention {
return;
}
if let Ok(files) = fs::read_dir(store_location) {
for file in files.flatten() {
// Every narwhal db is store in this directory with the number of the epoch as its
Expand All @@ -451,7 +454,7 @@ fn garbage_collect_old_stores(current_epoch: &u64, store_location: &PathBuf) {
.ok()
.and_then(|s| s.parse::<u64>().ok())
{
if epoch_num < current_epoch - 2 {
if epoch_num < current_epoch - retention {
if let Err(e) = fs::remove_dir_all(file.path()) {
error!("Unable to remove garbage collected Narwhal epoch: {e}");
}
Expand All @@ -472,16 +475,17 @@ pub enum PubSubMsg {

impl AutoImplSerde for PubSubMsg {}

#[test]
// Test to make sure we are deleting and keeping the write directories when running the
// garbage_collect_old_stores function
fn test_garbage_collecting_epochs() {
// Create a fake store directory and fill it with 10 directories to simulate 10 epoch
// directories
let store_path = std::path::Path::new("./TEST_GARBAGE_COLLECTION");
for i in 0..=10 {
let mut path = store_path.to_path_buf();
path.push(format!("{i}"));
// Tests to make sure the right directories are being deleted when running the
// garbage_collect_old_stores function.
#[cfg(test)]
mod test_garbage_collect {
use tempdir::TempDir;

use super::*;

fn create_epoch_directory(path: &PathBuf, epoch: u64) {
let mut path = path.to_path_buf();
path.push(format!("{epoch}"));
path.push("directory1");
fs::create_dir_all(path.clone()).unwrap();
path.set_file_name("file1.txt");
Expand All @@ -492,21 +496,81 @@ fn test_garbage_collecting_epochs() {
fs::File::create(path).unwrap();
}

// garbage collect the directory
garbage_collect_old_stores(&10, &store_path.to_path_buf());

// ensure that that there is only 3 directories left. One for current epoch and for the previous
// 2 epochs
for i in 0..=10 {
let mut dir_path = store_path.to_path_buf();
dir_path.push(format!("{i}"));
if i < 8 {
assert!(!dir_path.exists());
} else {
assert!(dir_path.exists());
}
#[test]
fn with_0_epochs_2_retention() {
let temp_dir = TempDir::new("test").unwrap();
let store_path = temp_dir.into_path().to_path_buf();

garbage_collect_old_stores(&0, &store_path.to_path_buf(), 2);

assert_eq!(fs::read_dir(store_path).unwrap().count(), 0);
}

// cleanup
fs::remove_dir_all(store_path).expect("Failed to cleanup test_garbage_collection_epochs");
#[test]
fn with_1_epoch_2_retention() {
let temp_dir = TempDir::new("test").unwrap();
let store_path = temp_dir.into_path();
create_epoch_directory(&store_path, 0);

garbage_collect_old_stores(&1, &store_path.to_path_buf(), 2);

assert_eq!(fs::read_dir(&store_path).unwrap().count(), 1);
assert!(store_path.join("0").exists());
}

#[test]
fn with_2_epochs_2_retention() {
let temp_dir = TempDir::new("test").unwrap();
let store_path = temp_dir.into_path();
create_epoch_directory(&store_path, 0);
create_epoch_directory(&store_path, 1);

garbage_collect_old_stores(&2, &store_path.to_path_buf(), 2);

assert_eq!(fs::read_dir(&store_path).unwrap().count(), 2);
assert!(store_path.join("0").exists());
assert!(store_path.join("1").exists());
}

#[test]
fn with_3_epochs_2_retention() {
let temp_dir = TempDir::new("test").unwrap();
let store_path = temp_dir.into_path();
create_epoch_directory(&store_path, 0);
create_epoch_directory(&store_path, 1);
create_epoch_directory(&store_path, 2);

garbage_collect_old_stores(&3, &store_path.to_path_buf(), 2);

assert_eq!(fs::read_dir(&store_path).unwrap().count(), 2);
assert!(!store_path.join("0").exists());
assert!(store_path.join("1").exists());
assert!(store_path.join("2").exists());
}

#[test]
fn with_10_epochs_2_retention() {
// Create a fake store directory and fill it with 10 directories to simulate 10 epoch
// directories
let temp_dir = TempDir::new("test").unwrap();
let store_path = temp_dir.into_path();
for i in 0..=10 {
create_epoch_directory(&store_path, i);
}

// Garbage collect the directory
garbage_collect_old_stores(&10, &store_path.to_path_buf(), 2);

// Ensure that that there is only 3 directories left. One for current epoch and for the
// previous 2 epochs
for i in 0..=10 {
let mut dir_path = store_path.to_path_buf();
dir_path.push(format!("{i}"));
if i < 8 {
assert!(!dir_path.exists());
} else {
assert!(dir_path.exists());
}
}
}
}

0 comments on commit 7e2a51a

Please sign in to comment.