Skip to content

Commit

Permalink
Raft: Drop append entries when upper layer is overloaded
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Nov 3, 2023
1 parent 091aa85 commit d361851
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
37 changes: 33 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type RaftNode interface {
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
Created() time.Time
Overloaded() bool
Stop()
Delete()
Wipe()
Expand Down Expand Up @@ -131,6 +132,7 @@ type raft struct {
track bool
werr error
state atomic.Int32 // RaftState
overload atomic.Bool
hh hash.Hash64
snapfile string
csz int
Expand Down Expand Up @@ -245,6 +247,7 @@ var (
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
overloadThreshold = 8192
)

type RaftConfig struct {
Expand Down Expand Up @@ -462,7 +465,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
}

// Send nil entry to signal the upper layers we are done doing replay/restore.
n.apply.push(nil)
n.pushToApply(nil)

// Make sure to track ourselves.
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}
Expand Down Expand Up @@ -874,6 +877,7 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
// Ignore if already applied.
if index > n.applied {
n.applied = index
n.updateOverloadState()
}
var state StreamState
n.wal.FastState(&state)
Expand Down Expand Up @@ -1090,7 +1094,7 @@ func (n *raft) setupLastSnapshot() {
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
n.pushToApply(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
Expand Down Expand Up @@ -1252,6 +1256,24 @@ func (n *raft) Healthy() bool {
return n.isCurrent(true)
}

func (n *raft) Overloaded() bool {
if n == nil {
return false
}
return n.overload.Load()
}

// Pushes to the apply queue and updates the overloaded state. Lock must be held.
func (n *raft) pushToApply(ce *CommittedEntry) {
n.apply.push(ce)
n.updateOverloadState()
}

// Updates the overloaded state. Lock must be held.
func (n *raft) updateOverloadState() {
n.overload.Store(n.apply.len() >= overloadThreshold || n.commit-n.applied >= uint64(overloadThreshold))
}

// HadPreviousLeader indicates if this group ever had a leader.
func (n *raft) HadPreviousLeader() bool {
n.RLock()
Expand Down Expand Up @@ -2655,7 +2677,7 @@ func (n *raft) applyCommit(index uint64) error {
if fpae {
delete(n.pae, index)
}
n.apply.push(newCommittedEntry(index, committed))
n.pushToApply(newCommittedEntry(index, committed))
} else {
// If we processed inline update our applied index.
n.applied = index
Expand Down Expand Up @@ -2836,6 +2858,13 @@ func (n *raft) runAsCandidate() {

// handleAppendEntry handles an append entry from the wire.
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
// If we are overwhelmed, i.e. the upper layer is not applying entries
// fast enough and our apply queue is building up, start to drop new
// append entries instead.
if n.Overloaded() {
return
}

msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
n.entry.push(ae)
Expand Down Expand Up @@ -3136,7 +3165,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1]))
n.pushToApply(newCommittedEntry(n.commit, ae.entries[:1]))
n.Unlock()
return

Expand Down
32 changes: 28 additions & 4 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@ func (sg smGroup) nonLeader() stateMachine {
return nil
}

// Causes the upper layer to purposefully block on receipt of
// append entries until unwedge is called, simulating the scenario
// that the upper layer is stuck on processing something.
// Note that this is different from PauseApply, which stops the
// Raft layer from sending applies to the upper layer at all.
func (sg smGroup) wedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Lock()
}
}

// Unwedges the upper layer. Any append entries that have built
// up in the apply queue will start to apply.
// Note that this is different from ResumeApply, which starts the
// Raft layer sending applies to the upper layer again.
func (sg smGroup) unwedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Unlock()
}
}

// Create a raft group and place on numMembers servers at random.
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
c.t.Helper()
Expand Down Expand Up @@ -153,10 +174,11 @@ func smLoop(sm stateMachine) {
// The adder state just sums up int64 values.
type stateAdder struct {
sync.Mutex
s *Server
n RaftNode
cfg *RaftConfig
sum int64
s *Server
n RaftNode
cfg *RaftConfig
sum int64
wedge sync.Mutex
}

// Simple getters for server and the raft node.
Expand All @@ -178,6 +200,8 @@ func (a *stateAdder) propose(data []byte) {
}

func (a *stateAdder) applyEntry(ce *CommittedEntry) {
a.wedge.Lock()
defer a.wedge.Unlock()
a.Lock()
defer a.Unlock()
if ce == nil {
Expand Down
32 changes: 32 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,35 @@ func TestNRGObserverMode(t *testing.T) {
require_True(t, n.node().IsObserver())
}
}

func TestNRGDetectOverload(t *testing.T) {
origOverloadThreshold := overloadThreshold
defer func() {
overloadThreshold = origOverloadThreshold
}()
overloadThreshold = 8
iterations := 32

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

rg.wedge()

sa := rg.leader().(*stateAdder)
sn := sa.node()

for i := 0; i < iterations; i++ {
sa.proposeDelta(1)
time.Sleep(time.Millisecond * 5)
}

require_True(t, sn.Overloaded())

rg.unwedge()
rg.waitOnTotal(t, int64(iterations))

require_False(t, sn.Overloaded())
}

0 comments on commit d361851

Please sign in to comment.