Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG (2.11): Use correct sequence when truncating to previous pterm/pindex #6073

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3363,17 +3363,14 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var ar *appendEntryResponse
var success bool

if n.commit > 0 && ae.pindex <= n.commit {
// Check if only our terms do not match here.
if ae.pindex == n.pindex {
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
// If we have already committed this entry, just mark success.
success = true
}
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
seq := ae.pindex
if ae.pindex < n.pindex {
seq++
}
if ae.pindex < n.commit {
// If we have already committed this entry, just mark success.
success = true
} else if eae, _ := n.loadEntry(seq); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
Expand Down
164 changes: 60 additions & 104 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,8 +1274,8 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)

// Timeline.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil})

Expand All @@ -1286,17 +1286,17 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)

// Deliver a message.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 2)

// Deliver another message.
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.wal.State().Msgs, 3)
Expand All @@ -1306,49 +1306,13 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T)

// Simulate receiving an old entry as a redelivery. We should not truncate as that lowers our commit.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.commit, 2)

// Heartbeat, makes sure we commit.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 3)
}

func TestNRGNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two tests that have been removed asserted that n.pterm = n.term could still be executed, resulting in a situation where we should recover from the possibility of having an incorrect pterm.

Since its removal in #6060, these tests should be removed. We should never have this situation occur anymore.

n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, first leader.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Timeline, leader changed, but pterm got set to term.
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 1)

// Heartbeat from another leader, pterm got set to term, make sure to only up our pterm.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 2)
}

func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()
Expand Down Expand Up @@ -1395,68 +1359,6 @@ func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) {
require_Equal(t, n.commit, 1)
}

func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, first leader.
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil})

// Timeline, leader changed, but pterm got set to term.
aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil})

// Initial case is simple, just store the entry.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Heartbeat, makes sure commit moves up.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.pterm, 1)

// Heartbeat from another leader, we missed a message so we need catchup.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 1) // n.pterm
require_Equal(t, n.catchup.pindex, 1) // n.pindex

// We get a message with an incorrect pterm, can only correct pterm and requires re-trigger of catchup.
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_True(t, n.catchup == nil)
require_Equal(t, n.pterm, 2)
require_Equal(t, n.pindex, 1)

// Heartbeat re-triggers catchup.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 2) // n.pterm
require_Equal(t, n.catchup.pindex, 1) // n.pindex

// Now we get the message again and can continue to store it.
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_Equal(t, n.wal.State().Msgs, 2)
require_True(t, n.catchup != nil)

// Heartbeat can now cancel catchup and move up our commit.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.commit, 2)
require_True(t, n.catchup == nil)
}

func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()
Expand Down Expand Up @@ -1667,3 +1569,57 @@ func TestNRGMultipleStopsDontPanic(t *testing.T) {
n.Stop()
}
}

func TestNRGTruncateDownToCommitted(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
nats1 := "yrzKKRBu" // "nats-1"

// Timeline, we are leader
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})

// Timeline, after leader change
aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil})

// Simply receive first message.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.commit, 0)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err := n.loadEntry(1)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Receive second message, which commits the first message.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// We receive an entry from another leader, should truncate down to commit / remove the second message.
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 1)

// Just replay the third message again so it's stored.
// (normally this would be done during a catchup, but use this as a short-cut for a shorter test)
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.commit, 1)
require_Equal(t, n.wal.State().Msgs, 2)
entry, err = n.loadEntry(2)
require_NoError(t, err)
require_Equal(t, entry.leader, nats1)

// Heartbeat moves commit up.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 2)
}