Skip to content

Commit

Permalink
refactor: optimize delta log deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jan 14, 2025
1 parent 4064587 commit 3a28281
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 80 deletions.
7 changes: 0 additions & 7 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,6 @@ pub struct MetaDeveloperConfig {
#[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")]
pub hummock_time_travel_sst_info_insert_batch_size: usize,

#[serde(default = "default::developer::hummock_delta_log_delete_batch_size")]
pub hummock_delta_log_delete_batch_size: usize,

#[serde(default = "default::developer::time_travel_vacuum_interval_sec")]
pub time_travel_vacuum_interval_sec: u64,

Expand Down Expand Up @@ -2070,10 +2067,6 @@ pub mod default {
100
}

pub fn hummock_delta_log_delete_batch_size() -> usize {
512
}

pub fn time_travel_vacuum_interval_sec() -> u64 {
30
}
Expand Down
1 change: 0 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400
meta_hummock_time_travel_sst_info_fetch_batch_size = 10000
meta_hummock_time_travel_sst_info_insert_batch_size = 100
meta_hummock_delta_log_delete_batch_size = 512
meta_time_travel_vacuum_interval_sec = 30
meta_hummock_time_travel_epoch_version_insert_batch_size = 1000
meta_hummock_gc_history_insert_batch_size = 1000
Expand Down
4 changes: 0 additions & 4 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,6 @@ pub fn start(
.meta
.developer
.hummock_time_travel_sst_info_insert_batch_size,
hummock_delta_log_delete_batch_size: config
.meta
.developer
.hummock_delta_log_delete_batch_size,
hummock_time_travel_epoch_version_insert_batch_size: config
.meta
.developer
Expand Down
76 changes: 23 additions & 53 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::cmp;
use std::collections::HashSet;
use std::ops::DerefMut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};

Expand All @@ -28,7 +27,7 @@ use risingwave_hummock_sdk::{
get_object_id_from_path, get_sst_data_path, HummockSstableObjectId, OBJECT_SUFFIX,
};
use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW;
use risingwave_meta_model::{hummock_gc_history, hummock_sequence};
use risingwave_meta_model::{hummock_gc_history, hummock_sequence, hummock_version_delta};
use risingwave_meta_model_migration::OnConflict;
use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef};
use risingwave_pb::stream_service::GetMinUncommittedSstIdRequest;
Expand All @@ -37,10 +36,8 @@ use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set};

use crate::backup_restore::BackupManagerRef;
use crate::hummock::error::{Error, Result};
use crate::hummock::manager::commit_multi_var;
use crate::hummock::HummockManager;
use crate::manager::MetadataManager;
use crate::model::BTreeMapTransaction;
use crate::MetaResult;

pub(crate) struct GcManager {
Expand Down Expand Up @@ -175,44 +172,37 @@ impl GcManager {
}

impl HummockManager {
/// Deletes at most `batch_size` deltas.
/// Deletes version deltas.
///
/// Returns (number of deleted deltas, number of remain `deltas_to_delete`).
pub async fn delete_version_deltas(&self, batch_size: usize) -> Result<(usize, usize)> {
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let context_info = self.context_info.read().await;
let deltas_to_delete_count = versioning
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.count();
/// Returns number of deleted deltas
pub async fn delete_version_deltas(&self) -> Result<usize> {
// If there is any safe point, skip this to ensure meta backup has required delta logs to
// replay version.
if !context_info.version_safe_points.is_empty() {
return Ok((0, deltas_to_delete_count));
if !self
.context_info
.read()
.await
.version_safe_points
.is_empty()
{
return Ok(0);
}
let batch = versioning
let version_id = self.versioning.read().await.checkpoint.version.id;
let res = hummock_version_delta::Entity::delete_many()
.filter(hummock_version_delta::Column::Id.lte(version_id.to_u64()))
.exec(&self.env.meta_store_ref().conn)
.await?;
tracing::debug!(rows_affected = res.rows_affected, "Deleted version deltas");
self.versioning
.write()
.await
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.map(|(k, _)| *k)
.take(batch_size)
.collect_vec();
let mut hummock_version_deltas =
BTreeMapTransaction::new(&mut versioning.hummock_version_deltas);
if batch.is_empty() {
return Ok((0, 0));
}
for delta_id in &batch {
hummock_version_deltas.remove(*delta_id);
}
commit_multi_var!(self.meta_store_ref(), hummock_version_deltas)?;
.retain(|id, _| *id > version_id);
#[cfg(test)]
{
drop(context_info);
drop(versioning_guard);
self.check_state_consistency().await;
}
Ok((batch.len(), deltas_to_delete_count - batch.len()))
Ok(res.rows_affected as usize)
}

/// Filters by Hummock version and Writes GC history.
Expand Down Expand Up @@ -464,26 +454,6 @@ impl HummockManager {
Ok(())
}

/// Deletes stale Hummock metadata.
///
/// Returns number of deleted deltas
pub async fn delete_metadata(&self) -> MetaResult<usize> {
let batch_size = self.env.opts.hummock_delta_log_delete_batch_size;
let mut total_deleted = 0;
loop {
if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 {
tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms))
.await;
}
let (deleted, remain) = self.delete_version_deltas(batch_size).await?;
total_deleted += deleted;
if total_deleted == 0 || remain < batch_size {
break;
}
}
Ok(total_deleted)
}

pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
let current_epoch_time = Epoch::now().physical_time();
let epoch_watermark = Epoch::from_physical_time(
Expand Down
15 changes: 3 additions & 12 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,23 +566,14 @@ async fn test_hummock_manager_basic() {
init_version_id + commit_log_count + register_log_count,
);
}
assert_eq!(
hummock_manager
.delete_version_deltas(usize::MAX)
.await
.unwrap(),
(0, 0)
);
assert_eq!(hummock_manager.delete_version_deltas().await.unwrap(), 0);
assert_eq!(
hummock_manager.create_version_checkpoint(1).await.unwrap(),
commit_log_count + register_log_count
);
assert_eq!(
hummock_manager
.delete_version_deltas(usize::MAX)
.await
.unwrap(),
((commit_log_count + register_log_count) as usize, 0)
hummock_manager.delete_version_deltas().await.unwrap(),
(commit_log_count + register_log_count) as usize
);
hummock_manager
.unpin_version_before(context_id_1, HummockVersionId::MAX)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub fn start_vacuum_metadata_loop(
return;
}
}
if let Err(err) = hummock_manager.delete_metadata().await {
if let Err(err) = hummock_manager.delete_version_deltas().await {
tracing::warn!(error = %err.as_report(), "Vacuum metadata error");
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ pub struct MetaOpts {
pub hummock_time_travel_snapshot_interval: u64,
pub hummock_time_travel_sst_info_fetch_batch_size: usize,
pub hummock_time_travel_sst_info_insert_batch_size: usize,
pub hummock_delta_log_delete_batch_size: usize,
pub hummock_time_travel_epoch_version_insert_batch_size: usize,
pub hummock_gc_history_insert_batch_size: usize,
pub hummock_time_travel_filter_out_objects_batch_size: usize,
Expand Down Expand Up @@ -280,7 +279,6 @@ impl MetaOpts {
hummock_time_travel_snapshot_interval: 0,
hummock_time_travel_sst_info_fetch_batch_size: 10_000,
hummock_time_travel_sst_info_insert_batch_size: 10,
hummock_delta_log_delete_batch_size: 1000,
hummock_time_travel_epoch_version_insert_batch_size: 1000,
hummock_gc_history_insert_batch_size: 1000,
hummock_time_travel_filter_out_objects_batch_size: 1000,
Expand Down

0 comments on commit 3a28281

Please sign in to comment.