Skip to content

Commit

Permalink
NRG: Vote request cancels catchup, new leader could have rejected sub…
Browse files Browse the repository at this point in the history
…sequent catchup messages

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Nov 4, 2024
1 parent f981ac3 commit 0f7cf3d
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
1 change: 1 addition & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -4063,6 +4063,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
strings.ToLower(n.State().String()), vr.term, n.term)
n.stepdownLocked(noLeader)
}
n.cancelCatchup()
n.term = vr.term
n.vote = noVote
n.writeTermVote()
Expand Down
33 changes: 33 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,3 +1617,36 @@ func TestNRGRecoverPindexPtermOnlyIfLogNotEmpty(t *testing.T) {
require_Equal(t, rn.pterm, 0)
require_Equal(t, rn.pindex, 0)
}

func TestNRGCancelCatchupWhenDetectingHigherTermDuringVoteRequest(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline.
aeCatchupTrigger := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})

// Truncate to simulate we missed one message and need to catchup.
n.processAppendEntry(aeCatchupTrigger, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 0) // n.pterm
require_Equal(t, n.catchup.pindex, 0) // n.pindex

// Process first message as part of the catchup.
catchupSub := n.catchup.sub
n.processAppendEntry(aeMsg1, catchupSub)
require_True(t, n.catchup != nil)

// Receiving a vote request should cancel our catchup.
// Otherwise, we could receive catchup messages after this that provides the previous leader with quorum.
// If the new leader doesn't have these entries, the previous leader would desync since it would commit them.
err := n.processVoteRequest(&voteRequest{2, 1, 1, nats0, "reply"})
require_NoError(t, err)
require_True(t, n.catchup == nil)
}

0 comments on commit 0f7cf3d

Please sign in to comment.