From af9209fd4054d05259f1dabcf407a49fea9aad6a Mon Sep 17 00:00:00 2001 From: Clement Date: Mon, 23 Sep 2024 23:09:54 +0800 Subject: [PATCH] etcdserver: keep raft log entries up to snapshot index Signed-off-by: Clement --- server/config/config.go | 4 +++ server/etcdserver/server.go | 33 +++++++++++++++++----- tests/framework/integration/cluster.go | 11 ++++++-- tests/integration/v3_watch_restore_test.go | 14 +++++---- 4 files changed, 47 insertions(+), 15 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 5f3a0d8e9f51..27018ae71333 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -58,6 +58,10 @@ type ServerConfig struct { // follower to catch up. SnapshotCatchUpEntries uint64 + // CompactRaftLogEveryNApplies compact raft log once every N applies. + // Minimum value is 1, which means compacting raft log every apply. + CompactRaftLogEveryNApplies uint64 + MaxSnapFiles uint MaxWALFiles uint diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ba3a3f3ffe1d..b6af69cbca1a 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -84,6 +84,10 @@ const ( // follower to catch up. DefaultSnapshotCatchUpEntries uint64 = 5000 + // DefaultCompactRaftLogEveryNApplies compact raft log once every N applies. + // Minimum value is 1, which means compacting raft log every apply. + DefaultCompactRaftLogEveryNApplies uint64 = 10 + StoreClusterPrefix = "/0" StoreKeysPrefix = "/1" @@ -569,6 +573,14 @@ func (s *EtcdServer) start() { ) s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries } + if s.Cfg.CompactRaftLogEveryNApplies == 0 { + lg.Info( + "updating compact raft log every N applies to default", + zap.Uint64("given-compact-raft-log-every-n-applies", s.Cfg.CompactRaftLogEveryNApplies), + zap.Uint64("updated-compact-raft-log-every-n-applies", DefaultCompactRaftLogEveryNApplies), + ) + s.Cfg.CompactRaftLogEveryNApplies = DefaultCompactRaftLogEveryNApplies + } s.w = wait.New() s.applyWait = wait.NewTimeList() @@ -980,6 +992,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { <-apply.notifyc s.triggerSnapshot(ep) + s.maybeCompactRaftLog(ep) select { // snapshot requested via send() case m := <-s.r.msgSnapC: @@ -2170,6 +2183,18 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { "saved snapshot", zap.Uint64("snapshot-index", snap.Metadata.Index), ) +} + +func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) { + lg := s.Logger() + + // Keep some in memory log entries for slow followers, while keeping the entries up to snapshot index. + // Only compact raft log once every N applies + if ep.appliedi <= ep.snapi+s.Cfg.SnapshotCatchUpEntries || ep.appliedi%s.Cfg.CompactRaftLogEveryNApplies != 0 { + return + } + + compacti := ep.appliedi - s.Cfg.SnapshotCatchUpEntries // When sending a snapshot, etcd will pause compaction. // After receives a snapshot, the slow follower needs to get all the entries right after @@ -2181,13 +2206,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { return } - // keep some in memory log entries for slow followers. - compacti := uint64(1) - if snapi > s.Cfg.SnapshotCatchUpEntries { - compacti = snapi - s.Cfg.SnapshotCatchUpEntries - } - - err = s.r.raftStorage.Compact(compacti) + err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. // raft log might already been compact. diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 95b5c88d9f80..82d103c88dc3 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -148,8 +148,9 @@ type ClusterConfig struct { MaxTxnOps uint MaxRequestBytes uint - SnapshotCount uint64 - SnapshotCatchUpEntries uint64 + SnapshotCount uint64 + SnapshotCatchUpEntries uint64 + CompactRaftLogEveryNApplies uint64 GRPCKeepAliveMinTime time.Duration GRPCKeepAliveInterval time.Duration @@ -276,6 +277,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member { MaxRequestBytes: c.Cfg.MaxRequestBytes, SnapshotCount: c.Cfg.SnapshotCount, SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries, + CompactRaftLogEveryNApplies: c.Cfg.CompactRaftLogEveryNApplies, GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime, GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval, GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout, @@ -601,6 +603,7 @@ type MemberConfig struct { MaxRequestBytes uint SnapshotCount uint64 SnapshotCatchUpEntries uint64 + CompactRaftLogEveryNApplies uint64 GRPCKeepAliveMinTime time.Duration GRPCKeepAliveInterval time.Duration GRPCKeepAliveTimeout time.Duration @@ -686,6 +689,10 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { if mcfg.SnapshotCatchUpEntries != 0 { m.SnapshotCatchUpEntries = mcfg.SnapshotCatchUpEntries } + m.CompactRaftLogEveryNApplies = etcdserver.DefaultCompactRaftLogEveryNApplies + if mcfg.CompactRaftLogEveryNApplies != 0 { + m.CompactRaftLogEveryNApplies = mcfg.CompactRaftLogEveryNApplies + } // for the purpose of integration testing, simple token is enough m.AuthToken = "simple" diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index f7e2e4b4730b..d1b2a6830f7c 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -55,9 +55,10 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { integration.BeforeTest(t) clus := integration.NewCluster(t, &integration.ClusterConfig{ - Size: 3, - SnapshotCount: 10, - SnapshotCatchUpEntries: 5, + Size: 3, + SnapshotCount: 10, + SnapshotCatchUpEntries: 5, + CompactRaftLogEveryNApplies: 10, }) defer clus.Terminate(t) @@ -102,11 +103,12 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet // changes. So member 0 has index 8 in raft log before network // partition. We need to trigger EtcdServer.snapshot() at least twice. + // Raft log is only compacted when appliedi%CompactRaftLogEveryNApplies==0 // - // SnapshotCount: 10, SnapshotCatchUpEntries: 5 + // SnapshotCount: 10, SnapshotCatchUpEntries: 5, CompactRaftLogEveryNApplies: 10 // - // T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8) - // T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date) + // T1: L(snapshot-index: 11, compacted-index: 5), F_m0(index:8) + // T2: L(snapshot-index: 22, compacted-index: 15), F_m0(index:8, out of date) // // Since there is no way to confirm server has compacted the log, we // use log monitor to watch and expect "compacted Raft logs" content.