Skip to content

Commit

Permalink
etcdserver: keep raft log entries up to snapshot index
Browse files Browse the repository at this point in the history
Signed-off-by: Clement <[email protected]>
  • Loading branch information
clement2026 committed Sep 23, 2024
1 parent bd93a00 commit af9209f
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 15 deletions.
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type ServerConfig struct {
// follower to catch up.
SnapshotCatchUpEntries uint64

// CompactRaftLogEveryNApplies compact raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
CompactRaftLogEveryNApplies uint64

MaxSnapFiles uint
MaxWALFiles uint

Expand Down
33 changes: 26 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ const (
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000

// DefaultCompactRaftLogEveryNApplies compact raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
DefaultCompactRaftLogEveryNApplies uint64 = 10

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"

Expand Down Expand Up @@ -569,6 +573,14 @@ func (s *EtcdServer) start() {
)
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}
if s.Cfg.CompactRaftLogEveryNApplies == 0 {
lg.Info(
"updating compact raft log every N applies to default",
zap.Uint64("given-compact-raft-log-every-n-applies", s.Cfg.CompactRaftLogEveryNApplies),
zap.Uint64("updated-compact-raft-log-every-n-applies", DefaultCompactRaftLogEveryNApplies),
)
s.Cfg.CompactRaftLogEveryNApplies = DefaultCompactRaftLogEveryNApplies
}

s.w = wait.New()
s.applyWait = wait.NewTimeList()
Expand Down Expand Up @@ -980,6 +992,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc

s.triggerSnapshot(ep)
s.maybeCompactRaftLog(ep)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand Down Expand Up @@ -2170,6 +2183,18 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}

func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) {
lg := s.Logger()

// Keep some in memory log entries for slow followers, while keeping the entries up to snapshot index.
// Only compact raft log once every N applies
if ep.appliedi <= ep.snapi+s.Cfg.SnapshotCatchUpEntries || ep.appliedi%s.Cfg.CompactRaftLogEveryNApplies != 0 {
return
}

compacti := ep.appliedi - s.Cfg.SnapshotCatchUpEntries

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand All @@ -2181,13 +2206,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
Expand Down
11 changes: 9 additions & 2 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ type ClusterConfig struct {
MaxTxnOps uint
MaxRequestBytes uint

SnapshotCount uint64
SnapshotCatchUpEntries uint64
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64

GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
Expand Down Expand Up @@ -276,6 +277,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
MaxRequestBytes: c.Cfg.MaxRequestBytes,
SnapshotCount: c.Cfg.SnapshotCount,
SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries,
CompactRaftLogEveryNApplies: c.Cfg.CompactRaftLogEveryNApplies,
GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval,
GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout,
Expand Down Expand Up @@ -601,6 +603,7 @@ type MemberConfig struct {
MaxRequestBytes uint
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
Expand Down Expand Up @@ -686,6 +689,10 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if mcfg.SnapshotCatchUpEntries != 0 {
m.SnapshotCatchUpEntries = mcfg.SnapshotCatchUpEntries
}
m.CompactRaftLogEveryNApplies = etcdserver.DefaultCompactRaftLogEveryNApplies
if mcfg.CompactRaftLogEveryNApplies != 0 {
m.CompactRaftLogEveryNApplies = mcfg.CompactRaftLogEveryNApplies
}

// for the purpose of integration testing, simple token is enough
m.AuthToken = "simple"
Expand Down
14 changes: 8 additions & 6 deletions tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
CompactRaftLogEveryNApplies: 10,
})
defer clus.Terminate(t)

Expand Down Expand Up @@ -102,11 +103,12 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
// elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet
// changes. So member 0 has index 8 in raft log before network
// partition. We need to trigger EtcdServer.snapshot() at least twice.
// Raft log is only compacted when appliedi%CompactRaftLogEveryNApplies==0
//
// SnapshotCount: 10, SnapshotCatchUpEntries: 5
// SnapshotCount: 10, SnapshotCatchUpEntries: 5, CompactRaftLogEveryNApplies: 10
//
// T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date)
// T1: L(snapshot-index: 11, compacted-index: 5), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 15), F_m0(index:8, out of date)
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
Expand Down

0 comments on commit af9209f

Please sign in to comment.