Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: separate "raft log compact" from snapshot #18235

Closed
wants to merge 8 commits into from
Closed
16 changes: 10 additions & 6 deletions server/etcdserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type raftNode struct {
// a chan to send out readState
readStateC chan raft.ReadState

// keep track of snapshots being created
snapshotTracker SnapshotTracker

// utility
ticker *time.Ticker
// contention detectors for raft heartbeat message
Expand Down Expand Up @@ -136,12 +139,13 @@ func newRaftNode(cfg raftNodeConfig) *raftNode {
raftNodeConfig: cfg,
// set up contention detectors for raft heartbeat message.
// expect to send a heartbeat within 2 heartbeat intervals.
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan toApply),
stopped: make(chan struct{}),
done: make(chan struct{}),
td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
readStateC: make(chan raft.ReadState, 1),
snapshotTracker: *newSnapshotTracker(),
msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
applyc: make(chan toApply),
stopped: make(chan struct{}),
done: make(chan struct{}),
}
if r.heartbeat == 0 {
r.ticker = &time.Ticker{}
Expand Down
56 changes: 40 additions & 16 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package etcdserver
import (
"context"
"encoding/json"
goerrors "errors"
"expvar"
"fmt"
"math"
Expand Down Expand Up @@ -963,6 +964,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc

s.triggerSnapshot(ep)
s.compactRaftLog(ep.appliedi)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand Down Expand Up @@ -2126,7 +2128,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
// the go routine created below.
s.KV().Commit()

s.r.snapshotTracker.Track(snapi)

s.GoAttach(func() {
defer func() {
s.r.snapshotTracker.UnTrack(snapi)
}()

lg := s.Logger()

// For backward compatibility, generate v2 snapshot from v3 state.
Expand Down Expand Up @@ -2154,28 +2162,44 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
})
}

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
lg.Info("skip compaction since there is an inflight snapshot")
return
}
func (s *EtcdServer) compactRaftLog(appliedi uint64) {
lg := s.Logger()

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}
// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
lg.Info("skip compaction since there is an inflight snapshot")
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(0)
if appliedi > s.Cfg.SnapshotCatchUpEntries {
compacti = appliedi - s.Cfg.SnapshotCatchUpEntries
}

// if there are snapshots being created, compact the raft log up to the minimum snapshot index.
if minSpani, err := s.r.snapshotTracker.MinSnapi(); err == nil && minSpani < appliedi && minSpani > s.Cfg.SnapshotCatchUpEntries {
compacti = minSpani - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
// no need to compact if compacti == 0
if compacti == 0 {
return
}

s.GoAttach(func() {
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
if goerrors.Is(err, raft.ErrCompacted) {
return
}
lg.Panic("failed to compact", zap.Error(err))
Expand Down
95 changes: 95 additions & 0 deletions server/etcdserver/snapshot_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

import (
"cmp"
"container/heap"
"errors"
"sync"
)

// SnapshotTracker keeps track of all ongoing snapshot creation. To safeguard ongoing snapshot creation,
// only compact the raft log up to the minimum snapshot index in the track.
type SnapshotTracker struct {
h minHeap[uint64]
mu *sync.Mutex
}

func newSnapshotTracker() *SnapshotTracker {
return &SnapshotTracker{
h: minHeap[uint64]{},
mu: new(sync.Mutex),
}
}

// MinSnapi returns the minimum snapshot index in the track or an error if the tracker is empty.
func (st *SnapshotTracker) MinSnapi() (uint64, error) {
st.mu.Lock()
defer st.mu.Unlock()
if st.h.Len() == 0 {
return 0, errors.New("SnapshotTracker is empty")
}
return st.h[0], nil
}

// Track adds a snapi to the tracker. Make sure to call UnTrack once the snapshot has been created.
func (st *SnapshotTracker) Track(snapi uint64) {
st.mu.Lock()
defer st.mu.Unlock()
heap.Push(&st.h, snapi)
}

// UnTrack removes 'snapi' from the tracker. No action taken if 'snapi' is not found.
func (st *SnapshotTracker) UnTrack(snapi uint64) {
st.mu.Lock()
defer st.mu.Unlock()

for i := 0; i < len((*st).h); i++ {
if (*st).h[i] == snapi {
heap.Remove(&st.h, i)
return
}
}
}

// minHeap implements the heap.Interface for E.
type minHeap[E interface {
cmp.Ordered
}] []E

func (h minHeap[_]) Len() int {
return len(h)
}

func (h minHeap[_]) Less(i, j int) bool {
return h[i] < h[j]
}

func (h minHeap[_]) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *minHeap[E]) Push(x any) {
*h = append(*h, x.(E))
}

func (h *minHeap[E]) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
138 changes: 138 additions & 0 deletions server/etcdserver/snapshot_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

import (
"container/heap"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSnapTracker_MinSnapi(t *testing.T) {
st := *newSnapshotTracker()

_, err := st.MinSnapi()
assert.NotNil(t, err, "SnapshotTracker should be empty initially")

st.Track(10)
minSnapi, err := st.MinSnapi()
assert.Nil(t, err)
assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the only tracked snapshot index")

st.Track(5)
minSnapi, err = st.MinSnapi()
assert.Nil(t, err)
assert.Equal(t, uint64(5), minSnapi, "MinSnapi should return the minimum tracked snapshot index")

st.UnTrack(5)
minSnapi, err = st.MinSnapi()
assert.Nil(t, err)
assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the remaining tracked snapshot index")
}

func TestSnapTracker_Track(t *testing.T) {
st := *newSnapshotTracker()
st.Track(20)
st.Track(10)
st.Track(15)

assert.Equal(t, 3, st.h.Len(), "SnapshotTracker should have 3 snapshots tracked")

minSnapi, err := st.MinSnapi()
assert.Nil(t, err)
assert.Equal(t, uint64(10), minSnapi, "MinSnapi should return the minimum tracked snapshot index")
}

func TestSnapTracker_UnTrack(t *testing.T) {
st := *newSnapshotTracker()
st.Track(20)
st.Track(30)
st.Track(40)
// track another snapshot with the same index
st.Track(20)

st.UnTrack(30)
assert.Equal(t, 3, st.h.Len())

minSnapi, err := st.MinSnapi()
assert.Nil(t, err)
assert.Equal(t, uint64(20), minSnapi)

st.UnTrack(20)
assert.Equal(t, 2, st.h.Len())

minSnapi, err = st.MinSnapi()
assert.Nil(t, err)
assert.Equal(t, uint64(20), minSnapi)

st.UnTrack(20)
minSnapi, err = st.MinSnapi()
assert.Nil(t, err)
assert.Equal(t, uint64(40), minSnapi)

st.UnTrack(40)
_, err = st.MinSnapi()
assert.NotNil(t, err)
}

func newMinHeap(elements ...uint64) minHeap[uint64] {
h := minHeap[uint64](elements)
heap.Init(&h)
return h
}

func TestMinHeapLen(t *testing.T) {
h := newMinHeap(3, 2, 1)
assert.Equal(t, 3, h.Len())
}

func TestMinHeapLess(t *testing.T) {
h := newMinHeap(3, 2, 1)
assert.True(t, h.Less(0, 1))
}

func TestMinHeapSwap(t *testing.T) {
h := newMinHeap(3, 2, 1)
h.Swap(0, 1)
assert.Equal(t, uint64(2), h[0])
assert.Equal(t, uint64(1), h[1])
assert.Equal(t, uint64(3), h[2])
}

func TestMinHeapPushPop(t *testing.T) {
h := newMinHeap(3, 2)
heap.Push(&h, uint64(1))
assert.Equal(t, 3, h.Len())

got := heap.Pop(&h).(uint64)
assert.Equal(t, uint64(1), got)
}

func TestMinHeapEmpty(t *testing.T) {
h := minHeap[uint64]{}
assert.Equal(t, 0, h.Len())
}

func TestMinHeapSingleElement(t *testing.T) {
h := newMinHeap(uint64(1))
assert.Equal(t, 1, h.Len())

heap.Push(&h, uint64(2))
assert.Equal(t, 2, h.Len())

got := heap.Pop(&h)
assert.Equal(t, uint64(1), got)
}