Skip to content

Commit

Permalink
NRG: Drop append entries when upper layer is overloaded
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Feb 15, 2024
1 parent 7e30546 commit 19181b5
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 24 deletions.
45 changes: 38 additions & 7 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,46 @@ type ipQueue[T any] struct {
elts []T
pos int
pool *sync.Pool
mrs int
mrs int // Max recycle size
calc func(e T) uint64 // Per-entry size calculator
sz atomic.Uint64 // Calculated size (only if calc != nil)
name string
m *sync.Map
}

type ipQueueOpts struct {
type ipQueueOpts[T any] struct {
maxRecycleSize int
calc func(e T) uint64
}

type ipQueueOpt func(*ipQueueOpts)
type ipQueueOpt[T any] func(*ipQueueOpts[T])

// This option allows to set the maximum recycle size when attempting
// to put back a slice to the pool.
func ipQueue_MaxRecycleSize(max int) ipQueueOpt {
return func(o *ipQueueOpts) {
func ipQueue_MaxRecycleSize[T any](max int) ipQueueOpt[T] {
return func(o *ipQueueOpts[T]) {
o.maxRecycleSize = max
}
}

func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt) *ipQueue[T] {
qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize}
// This option enables total queue size counting by passing in a function
// that evaluates the size of each entry as it is pushed/popped. This option
// enables the size() function.
func ipQueue_SizeCalculation[T any](calc func(e T) uint64) ipQueueOpt[T] {
return func(o *ipQueueOpts[T]) {
o.calc = calc
}
}

func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt[T]) *ipQueue[T] {
qo := ipQueueOpts[T]{maxRecycleSize: ipQueueDefaultMaxRecycleSize}
for _, o := range opts {
o(&qo)
}
q := &ipQueue[T]{
ch: make(chan struct{}, 1),
mrs: qo.maxRecycleSize,
calc: qo.calc,
pool: &sync.Pool{},
name: name,
m: &s.ipQueues,
Expand Down Expand Up @@ -83,6 +96,9 @@ func (q *ipQueue[T]) push(e T) int {
}
}
q.elts = append(q.elts, e)
if q.calc != nil {
q.sz.Add(q.calc(e))
}
l++
q.Unlock()
if signal {
Expand Down Expand Up @@ -116,6 +132,11 @@ func (q *ipQueue[T]) pop() []T {
}
q.elts, q.pos = nil, 0
atomic.AddInt64(&q.inprogress, int64(len(elts)))
if q.calc != nil {
for _, e := range elts {
q.sz.Add(-q.calc(e))
}
}
q.Unlock()
return elts
}
Expand All @@ -140,6 +161,9 @@ func (q *ipQueue[T]) popOne() (T, bool) {
}
e := q.elts[q.pos]
q.pos++
if q.calc != nil {
q.sz.Add(-q.calc(e))
}
l--
if l > 0 {
// We need to re-signal
Expand Down Expand Up @@ -189,6 +213,12 @@ func (q *ipQueue[T]) len() int {
return l
}

// Returns the calculated size of the queue (if ipQueue_SizeCalculation has been
// passed in), otherwise returns zero.
func (q *ipQueue[T]) size() uint64 {
return q.sz.Load()
}

// Empty the queue and consumes the notification signal if present.
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
Expand All @@ -202,6 +232,7 @@ func (q *ipQueue[T]) drain() {
q.resetAndReturnToPool(&q.elts)
q.elts, q.pos = nil, 0
}
q.sz.Store(0)
// Consume the signal if it was present to reduce the chance of a reader
// routine to be think that there is something in the queue...
select {
Expand Down
31 changes: 29 additions & 2 deletions server/ipqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestIPQueueBasic(t *testing.T) {
}

// Try to change the max recycle size
q2 := newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
q2 := newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize[int](10))
if q2.mrs != 10 {
t.Fatalf("Expected max recycle size to be 10, got %v", q2.mrs)
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestIPQueueRecycle(t *testing.T) {
}
}

q = newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
q = newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize[int](10))
for i := 0; i < 100; i++ {
q.push(i)
}
Expand Down Expand Up @@ -389,3 +389,30 @@ func TestIPQueueDrain(t *testing.T) {
}
}
}

func TestIPQueueSizeCalculation(t *testing.T) {
type testType = [16]byte
var testValue testType

calc := ipQueue_SizeCalculation[testType](func(e testType) uint64 {
return uint64(len(e))
})
s := &Server{}
q := newIPQueue[testType](s, "test", calc)

for i := 0; i < 10; i++ {
q.push(testValue)
require_Equal(t, q.len(), i+1)
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
}

for i := 10; i > 5; i-- {
q.popOne()
require_Equal(t, q.len(), i-1)
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
}

q.pop()
require_Equal(t, q.len(), 0)
require_Equal(t, q.size(), 0)
}
61 changes: 50 additions & 11 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type RaftNode interface {
LeadChangeC() <-chan bool
QuitC() <-chan struct{}
Created() time.Time
Overloaded() bool
Stop()
Delete()
Wipe()
Expand Down Expand Up @@ -235,13 +236,14 @@ const (
)

var (
minElectionTimeout = minElectionTimeoutDefault
maxElectionTimeout = maxElectionTimeoutDefault
minCampaignTimeout = minCampaignTimeoutDefault
maxCampaignTimeout = maxCampaignTimeoutDefault
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
minElectionTimeout = minElectionTimeoutDefault
maxElectionTimeout = maxElectionTimeoutDefault
minCampaignTimeout = minCampaignTimeoutDefault
maxCampaignTimeout = maxCampaignTimeoutDefault
hbInterval = hbIntervalDefault
lostQuorumInterval = lostQuorumIntervalDefault
lostQuorumCheck = lostQuorumCheckIntervalDefault
overloadThreshold uint64 = 1024 * 1024 * 8
)

type RaftConfig struct {
Expand Down Expand Up @@ -336,6 +338,19 @@ func (s *Server) bootstrapRaftNode(cfg *RaftConfig, knownPeers []string, allPeer
return writePeerState(cfg.Store, &peerState{knownPeers, expected, extUndetermined})
}

// calculateCommittedEntrySizeForIPQ is passed into the ipQueue to
// help with tracking the size of the committed entries in the queue.
func calculateCommittedEntrySizeForIPQ(e *CommittedEntry) uint64 {
if e == nil {
return 0
}
sz := uint64(8) // 64-bit index
for _, ent := range e.Entries {
sz += uint64(1 + len(ent.Data))
}
return sz
}

// startRaftNode will start the raft node.
func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabels) (RaftNode, error) {
if cfg == nil {
Expand All @@ -360,6 +375,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
return nil, errNoPeerState
}

calc := ipQueue_SizeCalculation[*CommittedEntry](calculateCommittedEntrySizeForIPQ)
qpfx := fmt.Sprintf("[ACC:%s] RAFT '%s' ", accName, cfg.Name)
n := &raft{
created: time.Now(),
Expand All @@ -386,12 +402,13 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
prop: newIPQueue[*Entry](s, qpfx+"entry"),
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry", calc),
accName: accName,
leadc: make(chan bool, 1),
observer: cfg.Observer,
extSt: ps.domainExt,
}

n.c.registerWithAccount(sacc)

if atomic.LoadInt32(&s.logging.debug) > 0 {
Expand Down Expand Up @@ -473,6 +490,9 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
}
}

// Send nil entry to signal the upper layers we are done doing replay/restore.
n.pushToApply(nil)

// Make sure to track ourselves.
n.peers[n.id] = &lps{time.Now().UnixNano(), 0, true}

Expand Down Expand Up @@ -1178,7 +1198,7 @@ func (n *raft) setupLastSnapshot() {
n.pterm = snap.lastTerm
n.commit = snap.lastIndex
n.applied = snap.lastIndex
n.apply.push(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
n.pushToApply(newCommittedEntry(n.commit, []*Entry{{EntrySnapshot, snap.data}}))
if _, err := n.wal.Compact(snap.lastIndex + 1); err != nil {
n.setWriteErrLocked(err)
}
Expand Down Expand Up @@ -1375,6 +1395,18 @@ func (n *raft) Healthy() bool {
return n.isCurrent(true)
}

func (n *raft) Overloaded() bool {
if n == nil {
return false
}
return n.apply.size() >= overloadThreshold
}

// Pushes to the apply queue and updates the overloaded state. Lock must be held.
func (n *raft) pushToApply(ce *CommittedEntry) {
n.apply.push(ce)
}

// HadPreviousLeader indicates if this group ever had a leader.
func (n *raft) HadPreviousLeader() bool {
n.RLock()
Expand Down Expand Up @@ -2803,7 +2835,7 @@ func (n *raft) applyCommit(index uint64) error {
if fpae {
delete(n.pae, index)
}
n.apply.push(newCommittedEntry(index, committed))
n.pushToApply(newCommittedEntry(index, committed))
} else {
// If we processed inline update our applied index.
n.applied = index
Expand Down Expand Up @@ -2980,6 +3012,13 @@ func (n *raft) runAsCandidate() {
// handleAppendEntry handles an append entry from the wire. This function
// is an internal callback from the "asubj" append entry subscription.
func (n *raft) handleAppendEntry(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
// If we are overwhelmed, i.e. the upper layer is not applying entries
// fast enough and our apply queue is building up, start to drop new
// append entries instead.
if n.Overloaded() {
return
}

msg = copyBytes(msg)
if ae, err := n.decodeAppendEntry(msg, sub, reply); err == nil {
// Push to the new entry channel. From here one of the worker
Expand Down Expand Up @@ -3299,7 +3338,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

// Now send snapshot to upper levels. Only send the snapshot, not the peerstate entry.
n.apply.push(newCommittedEntry(n.commit, ae.entries[:1]))
n.pushToApply(newCommittedEntry(n.commit, ae.entries[:1]))
n.Unlock()
return

Expand Down
32 changes: 28 additions & 4 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@ func (sg smGroup) nonLeader() stateMachine {
return nil
}

// Causes the upper layer to purposefully block on receipt of
// append entries until unwedge is called, simulating the scenario
// that the upper layer is stuck on processing something.
// Note that this is different from PauseApply, which stops the
// Raft layer from sending applies to the upper layer at all.
func (sg smGroup) wedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Lock()
}
}

// Unwedges the upper layer. Any append entries that have built
// up in the apply queue will start to apply.
// Note that this is different from ResumeApply, which starts the
// Raft layer sending applies to the upper layer again.
func (sg smGroup) unwedge() {
for _, n := range sg {
n.(*stateAdder).wedge.Unlock()
}
}

// Create a raft group and place on numMembers servers at random.
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
c.t.Helper()
Expand Down Expand Up @@ -153,10 +174,11 @@ func smLoop(sm stateMachine) {
// The adder state just sums up int64 values.
type stateAdder struct {
sync.Mutex
s *Server
n RaftNode
cfg *RaftConfig
sum int64
s *Server
n RaftNode
cfg *RaftConfig
sum int64
wedge sync.Mutex
}

// Simple getters for server and the raft node.
Expand All @@ -178,6 +200,8 @@ func (a *stateAdder) propose(data []byte) {
}

func (a *stateAdder) applyEntry(ce *CommittedEntry) {
a.wedge.Lock()
defer a.wedge.Unlock()
a.Lock()
defer a.Unlock()
if ce == nil {
Expand Down
32 changes: 32 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,35 @@ func TestNRGSimpleElection(t *testing.T) {
require_Equal(t, rn.vote, vr.candidate)
}
}

func TestNRGDetectOverload(t *testing.T) {
origOverloadThreshold := overloadThreshold
defer func() {
overloadThreshold = origOverloadThreshold
}()
iterations := 32
overloadThreshold = uint64(iterations-2) * 8

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

rg.wedge()

sa := rg.leader().(*stateAdder)
sn := sa.node()

for i := 0; i < iterations; i++ {
sa.proposeDelta(1)
time.Sleep(time.Millisecond * 5)
}

require_True(t, sn.Overloaded())

rg.unwedge()
rg.waitOnTotal(t, int64(iterations))

require_False(t, sn.Overloaded())
}

0 comments on commit 19181b5

Please sign in to comment.