Skip to content

Commit

Permalink
raft: advance commit index safely
Browse files Browse the repository at this point in the history
This change makes the commit index advancement in handleHeartbeat safe.
Previously, a follower would attempt to update the commit index to
whichever was sent in the MsgHeartbeat message. Out-of-bound indices
would crash the node.

It is always safe to advance a commit index if the follower's log is "in
sync" with the leader, i.e. when its log is guaranteed to be a prefix of
the leader's log. This becomes true when the first MsgApp append message
succeeds.

At the moment, the leader will never send a commit index that exceeds
the follower's log size. However, this may change in future. This change
is a defence-in-depth.

Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Jan 26, 2024
1 parent 026484c commit eb6bfc6
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
14 changes: 13 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ type raft struct {

// the leader id
lead uint64
// logSynced is true if this node's log is guaranteed to be a prefix of the
// leader's log at this term. Always true for the leader. Always false for a
// candidate. For a follower, becomes true the first time a MsgApp append to
// the log succeeds.
logSynced bool
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
Expand Down Expand Up @@ -763,6 +768,7 @@ func (r *raft) reset(term uint64) {
r.Vote = None
}
r.lead = None
r.logSynced = false

r.electionElapsed = 0
r.heartbeatElapsed = 0
Expand Down Expand Up @@ -908,6 +914,7 @@ func (r *raft) becomeLeader() {
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.logSynced = true // the leader's log is in sync with itself
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
Expand Down Expand Up @@ -1735,6 +1742,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.logSynced = true // from now on, the log is a prefix of the leader's log
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
return
}
Expand Down Expand Up @@ -1770,7 +1778,11 @@ func (r *raft) handleAppendEntries(m pb.Message) {
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
// If our log is not a prefix of the leader's log, it is unsafe to advance the
// commit index, because the entries at this index may mismatch.
if r.logSynced {
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex()))
}
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Expand Down
17 changes: 13 additions & 4 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,11 +1332,18 @@ func TestHandleMsgApp(t *testing.T) {
func TestHandleHeartbeat(t *testing.T) {
commit := uint64(2)
tests := []struct {
m pb.Message
wCommit uint64
m pb.Message
logSynced bool
wCommit uint64
}{
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, true, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, true, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, false, commit},

// Increase the commit index only if the log is in sync with the leader.
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, false, commit},
// Do not increase the commit index beyond our log size.
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, true, commit + 1},
}

for i, tt := range tests {
Expand All @@ -1345,6 +1352,8 @@ func TestHandleHeartbeat(t *testing.T) {
sm := newTestRaft(1, 5, 1, storage)
sm.becomeFollower(2, 2)
sm.raftLog.commitTo(commit)
sm.logSynced = tt.logSynced

sm.handleHeartbeat(tt.m)
if sm.raftLog.committed != tt.wCommit {
t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
Expand Down
1 change: 1 addition & 0 deletions rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rawNode.raft.logSynced = true // needed to be able to advance the commit index

for highestApplied := uint64(0); highestApplied != 11; {
rd := rawNode.Ready()
Expand Down

0 comments on commit eb6bfc6

Please sign in to comment.