Skip to content

Commit

Permalink
NRG: Use correct sequence when truncating to previous pterm/pindex
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Nov 6, 2024
1 parent 4122ab9 commit a9c30c3
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 115 deletions.
19 changes: 8 additions & 11 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3363,17 +3363,14 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var ar *appendEntryResponse
var success bool

if n.commit > 0 && ae.pindex <= n.commit {
// Check if only our terms do not match here.
if ae.pindex == n.pindex {
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
// If we have already committed this entry, just mark success.
success = true
}
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
seq := ae.pindex
if ae.pindex < n.pindex {
seq++
}
if ae.pindex < n.commit {
// If we have already committed this entry, just mark success.
success = true
} else if eae, _ := n.loadEntry(seq); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
Expand Down
164 changes: 60 additions & 104 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,8 +1274,8 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)

// Timeline.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil})

Expand All @@ -1286,17 +1286,17 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)

// Deliver a message.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 2)

// Deliver another message.
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.wal.State().Msgs, 3)
Expand All @@ -1306,49 +1306,13 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)

// Simulate receiving an old entry as a redelivery. We should not truncate as that lowers our commit.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.commit, 2)

// Heartbeat, makes sure we commit.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 3)
}

func TestNRGNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, first leader.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Timeline, leader changed, but pterm got set to term.
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 1)

// Heartbeat from another leader, pterm got set to term, make sure to only up our pterm.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 2)
}

func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()
Expand Down Expand Up @@ -1395,68 +1359,6 @@ func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) {
require_Equal(t, n.commit, 1)
}

func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, first leader.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Timeline, leader changed, but pterm got set to term.
aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 1)

// Heartbeat from another leader, we missed a message so we need catchup.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 1) // n.pterm
require_Equal(t, n.catchup.pindex, 1) // n.pindex

// We get a message with an incorrect pterm, can only correct pterm and requires re-trigger of catchup.
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_True(t, n.catchup == nil)
require_Equal(t, n.pterm, 2)
require_Equal(t, n.pindex, 1)

// Heartbeat re-triggers catchup.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 2) // n.pterm
require_Equal(t, n.catchup.pindex, 1) // n.pindex

// Now we get the message again and can continue to store it.
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 2)
require_True(t, n.catchup != nil)

// Heartbeat can now cancel catchup and move up our commit.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 2)
require_True(t, n.catchup == nil)
}

func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()
Expand Down Expand Up @@ -1667,3 +1569,57 @@ func TestNRGMultipleStopsDontPanic(t *testing.T) {
n.Stop()
}
}

func TestNRGTruncateDownToCommitted(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, we are leader
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})

// Timeline, after leader change
aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil})

// Simply receive first message.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.commit, 0)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Receive second message, which commits the first message.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// We receive an entry from another leader, should truncate down to commit / remove the second message.
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 1)

// Just replay the third message again so it's stored.
// (normally this would be done during a catchup, but use this as a short-cut for a shorter test)
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats1)

// Heartbeat moves commit up.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 2)
}

0 comments on commit a9c30c3

Please sign in to comment.