From eb6bfc6966a2e90589986c8f6fd808d6d97ab1e9 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 26 Jan 2024 02:22:07 +0000 Subject: [PATCH] raft: advance commit index safely This change makes the commit index advancement in handleHeartbeat safe. Previously, a follower would attempt to update the commit index to whichever was sent in the MsgHeartbeat message. Out-of-bound indices would crash the node. It is always safe to advance a commit index if the follower's log is "in sync" with the leader, i.e. when its log is guaranteed to be a prefix of the leader's log. This becomes true when the first MsgApp append message succeeds. At the moment, the leader will never send a commit index that exceeds the follower's log size. However, this may change in future. This change is a defence-in-depth. Signed-off-by: Pavel Kalinnikov --- raft.go | 14 +++++++++++++- raft_test.go | 17 +++++++++++++---- rawnode_test.go | 1 + 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/raft.go b/raft.go index 5a150562..de58f014 100644 --- a/raft.go +++ b/raft.go @@ -375,6 +375,11 @@ type raft struct { // the leader id lead uint64 + // logSynced is true if this node's log is guaranteed to be a prefix of the + // leader's log at this term. Always true for the leader. Always false for a + // candidate. For a follower, becomes true the first time a MsgApp append to + // the log succeeds. + logSynced bool // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 @@ -763,6 +768,7 @@ func (r *raft) reset(term uint64) { r.Vote = None } r.lead = None + r.logSynced = false r.electionElapsed = 0 r.heartbeatElapsed = 0 @@ -908,6 +914,7 @@ func (r *raft) becomeLeader() { r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id + r.logSynced = true // the leader's log is in sync with itself r.state = StateLeader // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is @@ -1735,6 +1742,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.logSynced = true // from now on, the log is a prefix of the leader's log r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1770,7 +1778,11 @@ func (r *raft) handleAppendEntries(m pb.Message) { } func (r *raft) handleHeartbeat(m pb.Message) { - r.raftLog.commitTo(m.Commit) + // If our log is not a prefix of the leader's log, it is unsafe to advance the + // commit index, because the entries at this index may mismatch. + if r.logSynced { + r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) + } r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..fd3739ae 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1332,11 +1332,18 @@ func TestHandleMsgApp(t *testing.T) { func TestHandleHeartbeat(t *testing.T) { commit := uint64(2) tests := []struct { - m pb.Message - wCommit uint64 + m pb.Message + logSynced bool + wCommit uint64 }{ - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1}, - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, true, commit + 1}, + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, true, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, false, commit}, + + // Increase the commit index only if the log is in sync with the leader. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, false, commit}, + // Do not increase the commit index beyond our log size. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, true, commit + 1}, } for i, tt := range tests { @@ -1345,6 +1352,8 @@ func TestHandleHeartbeat(t *testing.T) { sm := newTestRaft(1, 5, 1, storage) sm.becomeFollower(2, 2) sm.raftLog.commitTo(commit) + sm.logSynced = tt.logSynced + sm.handleHeartbeat(tt.m) if sm.raftLog.committed != tt.wCommit { t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit) diff --git a/rawnode_test.go b/rawnode_test.go index bca5e64c..eef09764 100644 --- a/rawnode_test.go +++ b/rawnode_test.go @@ -952,6 +952,7 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { if err != nil { t.Fatal(err) } + rawNode.raft.logSynced = true // needed to be able to advance the commit index for highestApplied := uint64(0); highestApplied != 11; { rd := rawNode.Ready()