From f64d1560b56757ca876cfdec3c6bcbbba8b5e61c Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 26 Jan 2024 02:22:07 +0000 Subject: [PATCH] raft: advance commit index safely This change makes the commit index advancement in handleHeartbeat safe. Previously, a follower would attempt to update the commit index to whichever was sent in the MsgHeartbeat message. Out-of-bound indices would crash the node. It is always safe to advance a commit index if the follower's log is "in sync" with the leader, i.e. when its log is guaranteed to be a prefix of the leader's log. This is always true if the term of last entry in the log matches the leader team, otherwise this guarantee is established when the first MsgApp append message from the leader succeeds. At the moment, the leader will never send a commit index that exceeds the follower's log size. However, this may change in future. This change is a defence-in-depth. Signed-off-by: Pavel Kalinnikov --- raft.go | 19 ++++++++++++++++++- raft_test.go | 18 +++++++++++++----- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/raft.go b/raft.go index 5a150562..8e6174a5 100644 --- a/raft.go +++ b/raft.go @@ -375,6 +375,12 @@ type raft struct { // the leader id lead uint64 + // logSynced is true if this node's log is guaranteed to be a prefix of the + // leader's log at this term. Always true for the leader. Always false for a + // candidate. For a follower, this is true if the last entry term matches the + // leader term, otherwise becomes true when the first MsgApp append from the + // leader succeeds. + logSynced bool // leadTransferee is id of the leader transfer target when its value is not zero. // Follow the procedure defined in raft thesis 3.10. leadTransferee uint64 @@ -763,6 +769,7 @@ func (r *raft) reset(term uint64) { r.Vote = None } r.lead = None + r.logSynced = false r.electionElapsed = 0 r.heartbeatElapsed = 0 @@ -866,6 +873,10 @@ func (r *raft) becomeFollower(term uint64, lead uint64) { r.reset(term) r.tick = r.tickElection r.lead = lead + // If the last entry term matches the leader term, the log is guaranteed to be + // a prefix of the leader's log. Otherwise, we will establish this guarantee + // later, on the first successful MsgApp. + r.logSynced = r.raftLog.lastTerm() == term r.state = StateFollower r.logger.Infof("%x became follower at term %d", r.id, r.Term) } @@ -908,6 +919,7 @@ func (r *raft) becomeLeader() { r.reset(r.Term) r.tick = r.tickHeartbeat r.lead = r.id + r.logSynced = true // the leader's log is in sync with itself r.state = StateLeader // Followers enter replicate mode when they've been successfully probed // (perhaps after having received a snapshot as a result). The leader is @@ -1735,6 +1747,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { return } if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.logSynced = true // from now on, the log is a prefix of the leader's log r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1770,7 +1783,11 @@ func (r *raft) handleAppendEntries(m pb.Message) { } func (r *raft) handleHeartbeat(m pb.Message) { - r.raftLog.commitTo(m.Commit) + // It is only safe to advance the commit index if our log is a prefix of the + // leader's log. Otherwise, entries at this index may mismatch. + if r.logSynced { + r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex())) + } r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) } diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..16e40608 100644 --- a/raft_test.go +++ b/raft_test.go @@ -1332,16 +1332,24 @@ func TestHandleMsgApp(t *testing.T) { func TestHandleHeartbeat(t *testing.T) { commit := uint64(2) tests := []struct { - m pb.Message - wCommit uint64 + m pb.Message + lastTerm uint64 + wCommit uint64 }{ - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1}, - {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 2, commit + 1}, + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 2, commit}, // do not decrease commit + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, 1, commit}, + + // Do not increase the commit index if the log is not guaranteed to be a + // prefix of the leader's log. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, 1, commit}, + // Do not increase the commit index beyond our log size. + {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, 2, commit + 1}, } for i, tt := range tests { storage := newTestMemoryStorage(withPeers(1, 2)) - storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}) + storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}, {Index: 3, Term: tt.lastTerm}}) sm := newTestRaft(1, 5, 1, storage) sm.becomeFollower(2, 2) sm.raftLog.commitTo(commit)