From d361851720990a986a09bc490c6c4d0332a2e1b0 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 23 Oct 2023 13:03:29 +0100 Subject: [PATCH] Raft: Drop append entries when upper layer is overloaded Signed-off-by: Neil Twigg --- server/raft.go | 37 +++++++++++++++++++++++++++++++++---- server/raft_helpers_test.go | 32 ++++++++++++++++++++++++++++---- server/raft_test.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 8 deletions(-) diff --git a/server/raft.go b/server/raft.go index a169011623b..78bb65d2a63 100644 --- a/server/raft.go +++ b/server/raft.go @@ -70,6 +70,7 @@ type RaftNode interface { LeadChangeC() <-chan bool QuitC() <-chan struct{} Created() time.Time + Overloaded() bool Stop() Delete() Wipe() @@ -131,6 +132,7 @@ type raft struct { track bool werr error state atomic.Int32 // RaftState + overload atomic.Bool hh hash.Hash64 snapfile string csz int @@ -245,6 +247,7 @@ var ( hbInterval = hbIntervalDefault lostQuorumInterval = lostQuorumIntervalDefault lostQuorumCheck = lostQuorumCheckIntervalDefault + overloadThreshold = 8192 ) type RaftConfig struct { @@ -462,7 +465,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } // Send nil entry to signal the upper layers we are done doing replay/restore. - n.apply.push(nil) + n.pushToApply(nil) // Make sure to track ourselves. n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true} @@ -874,6 +877,7 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { // Ignore if already applied. if index > n.applied { n.applied = index + n.updateOverloadState() } var state StreamState n.wal.FastState(&state) @@ -1090,7 +1094,7 @@ func (n *raft) setupLastSnapshot() { n.pterm = snap.lastTerm n.commit = snap.lastIndex n.applied = snap.lastIndex - n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) + n.pushToApply(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}})) if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil { n.setWriteErrLocked(err) } @@ -1252,6 +1256,24 @@ func (n *raft) Healthy() bool { return n.isCurrent(true) } +func (n *raft) Overloaded() bool { + if n == nil { + return false + } + return n.overload.Load() +} + +// Pushes to the apply queue and updates the overloaded state. Lock must be held. +func (n *raft) pushToApply(ce *CommittedEntry) { + n.apply.push(ce) + n.updateOverloadState() +} + +// Updates the overloaded state. Lock must be held. +func (n *raft) updateOverloadState() { + n.overload.Store(n.apply.len() >= overloadThreshold || n.commit-n.applied >= uint64(overloadThreshold)) +} + // HadPreviousLeader indicates if this group ever had a leader. func (n *raft) HadPreviousLeader() bool { n.RLock() @@ -2655,7 +2677,7 @@ func (n *raft) applyCommit(index uint64) error { if fpae { delete(n.pae, index) } - n.apply.push(newCommittedEntry(index, committed)) + n.pushToApply(newCommittedEntry(index, committed)) } else { // If we processed inline update our applied index. n.applied = index @@ -2836,6 +2858,13 @@ func (n *raft) runAsCandidate() { // handleAppendEntry handles an append entry from the wire. func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) { + // If we are overwhelmed, i.e. the upper layer is not applying entries + // fast enough and our apply queue is building up, start to drop new + // append entries instead. + if n.Overloaded() { + return + } + msg = copyBytes(msg) if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil { n.entry.push(ae) @@ -3136,7 +3165,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry. - n.apply.push(newCommittedEntry(n.commit, ae.entries[:1])) + n.pushToApply(newCommittedEntry(n.commit, ae.entries[:1])) n.Unlock() return diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index e7a7ef881cd..24e828f7b1a 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -83,6 +83,27 @@ func (sg smGroup) nonLeader() stateMachine { return nil } +// Causes the upper layer to purposefully block on receipt of +// append entries until unwedge is called, simulating the scenario +// that the upper layer is stuck on processing something. +// Note that this is different from PauseApply, which stops the +// Raft layer from sending applies to the upper layer at all. +func (sg smGroup) wedge() { + for _, n := range sg { + n.(*stateAdder).wedge.Lock() + } +} + +// Unwedges the upper layer. Any append entries that have built +// up in the apply queue will start to apply. +// Note that this is different from ResumeApply, which starts the +// Raft layer sending applies to the upper layer again. +func (sg smGroup) unwedge() { + for _, n := range sg { + n.(*stateAdder).wedge.Unlock() + } +} + // Create a raft group and place on numMembers servers at random. func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup { c.t.Helper() @@ -153,10 +174,11 @@ func smLoop(sm stateMachine) { // The adder state just sums up int64 values. type stateAdder struct { sync.Mutex - s *Server - n RaftNode - cfg *RaftConfig - sum int64 + s *Server + n RaftNode + cfg *RaftConfig + sum int64 + wedge sync.Mutex } // Simple getters for server and the raft node. @@ -178,6 +200,8 @@ func (a *stateAdder) propose(data []byte) { } func (a *stateAdder) applyEntry(ce *CommittedEntry) { + a.wedge.Lock() + defer a.wedge.Unlock() a.Lock() defer a.Unlock() if ce == nil { diff --git a/server/raft_test.go b/server/raft_test.go index 193951484f8..05617c3fdd0 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -214,3 +214,35 @@ func TestNRGObserverMode(t *testing.T) { require_True(t, n.node().IsObserver()) } } + +func TestNRGDetectOverload(t *testing.T) { + origOverloadThreshold := overloadThreshold + defer func() { + overloadThreshold = origOverloadThreshold + }() + overloadThreshold = 8 + iterations := 32 + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + rg.wedge() + + sa := rg.leader().(*stateAdder) + sn := sa.node() + + for i := 0; i < iterations; i++ { + sa.proposeDelta(1) + time.Sleep(time.Millisecond * 5) + } + + require_True(t, sn.Overloaded()) + + rg.unwedge() + rg.waitOnTotal(t, int64(iterations)) + + require_False(t, sn.Overloaded()) +}