diff --git a/confchange/confchange.go b/confchange/confchange.go index 55db16a8..c653ea94 100644 --- a/confchange/confchange.go +++ b/confchange/confchange.go @@ -259,8 +259,7 @@ func (c Changer) initProgress(cfg *tracker.Config, trk tracker.ProgressMap, id u // at all (and will thus likely need a snapshot), though the app may // have applied a snapshot out of band before adding the replica (thus // making the first index the better choice). - Next: c.LastIndex, - Match: 0, + Watermark: tracker.Watermark{Next: c.LastIndex}, Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes), IsLearner: isLearner, // When a node is first added, we should mark it as recently active. diff --git a/raft.go b/raft.go index 5e5ad93a..f04851cc 100644 --- a/raft.go +++ b/raft.go @@ -586,85 +586,123 @@ func (r *raft) send(m pb.Message) { } } -// sendAppend sends an append RPC with new entries (if any) and the -// current commit index to the given peer. -func (r *raft) sendAppend(to uint64) { - r.maybeSendAppend(to, true) -} - -// maybeSendAppend sends an append RPC with new entries to the given peer, -// if necessary. Returns true if a message was sent. The sendIfEmpty -// argument controls whether messages with no entries will be sent -// ("empty" messages are useful to convey updated Commit indexes, but -// are undesirable when we're sending multiple messages in a batch). -func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { - pr := r.trk.Progress[to] - if pr.IsPaused() { +// maybeSendAppend sends an append RPC with new entries and commit index to the +// given peer, if necessary. Returns true if a message was sent. +// +// May send an empty message, to convey an update Commit index. +func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { + if to == r.id { + return false + } + if st := pr.State; st != tracker.StateProbe && st != tracker.StateReplicate { + return false + } else if st == tracker.StateProbe && pr.Pause { return false } - lastIndex, nextIndex := pr.Next-1, pr.Next - lastTerm, errt := r.raftLog.term(lastIndex) - - var ents []pb.Entry - var erre error - // In a throttled StateReplicate only send empty MsgApp, to ensure progress. - // Otherwise, if we had a full Inflights and all inflight messages were in - // fact dropped, replication to that follower would stall. Instead, an empty - // MsgApp will eventually reach the follower (heartbeats responses prompt the - // leader to send an append), allowing it to be acked or rejected, both of - // which will clear out Inflights. - if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize) + prevTerm, err := r.raftLog.term(pr.Next - 1) + if err != nil { + // The log probably got truncated at >= pr.Next, so we can't catch up the + // follower log anymore. Send a snapshot instead. + return r.sendSnapshot(to, pr) + } + if pr.State == tracker.StateProbe { // !pr.Pause + pr.Pause = true + // r.logger.Info(to, "cw probe = true") + return r.sendAppend(to, pr, prevTerm) } + // StateReplicate + repl := (*tracker.ProgressReplicate)(pr) - if len(ents) == 0 && !sendIfEmpty { + lastIndex, commit := r.raftLog.lastIndex(), r.raftLog.committed + if repl.UpToDate(lastIndex, commit) { + // r.logger.Info(to, "up to date") return false } - - if errt != nil || erre != nil { // send snapshot if we failed to get term or entries - if !pr.RecentActive { - r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) - return false - } - - snapshot, err := r.raftLog.snapshot() - if err != nil { - if err == ErrSnapshotTemporarilyUnavailable { - r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) - return false - } - panic(err) // TODO(bdarnell) - } - if IsEmptySnap(snapshot) { - panic("need non-empty snapshot") - } - sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term - r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", - r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) - pr.BecomeSnapshot(sindex) - r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - - r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) - return true + if !repl.IsThrottled(lastIndex) { + // r.logger.Info(to, "regular append") + return r.sendAppend(to, pr, prevTerm) } + // In a throttled StateReplicate only send empty MsgApp, to ensure progress. + // Otherwise, if all the inflight messages are dropped, replication to that + // follower stalls. We send an empty MsgApp periodically, so that eventually + // it reaches the follower, and the latter acks or rejects it. The Pause flag + // is reset by a HeartbeatResp message, which is guaranteed if the follower is + // connected. + if !repl.Pause { + repl.Pause = true + return r.sendMsgAppPing(to, pr, prevTerm) + } + // Ensure sending an empty MsgApp also if the follower's commit index can be + // moved forward. + if repl.ShouldSendCommit(commit) && !pr.Commit.Pause { + pr.Commit.Pause = true + return r.sendMsgAppPing(to, pr, prevTerm) + } + return false +} + +func (r *raft) sendMsgAppPing(to uint64, pr *tracker.Progress, prevTerm uint64) bool { + commit := r.raftLog.committed + // r.logger.Info("send probe for", to) + r.send(pb.Message{ + To: to, + Type: pb.MsgApp, + Index: pr.Next - 1, + LogTerm: prevTerm, + Commit: commit, + }) + pr.Commit.Sent(commit) + return true +} - // Send the actual MsgApp otherwise, and update the progress accordingly. - if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil { - r.logger.Panicf("%x: %v", r.id, err) +func (r *raft) sendAppend(to uint64, pr *tracker.Progress, prevTerm uint64) bool { + entries, err := r.raftLog.entries(pr.Next, r.maxMsgSize) + if err != nil { // send snapshot if we failed to get entries + return r.sendSnapshot(to, pr) } - // NB: pr has been updated, but we make sure to only use its old values below. + prevIndex := pr.Next - 1 + pr.UpdateOnEntriesSend(len(entries), uint64(payloadsSize(entries))) + commit := r.raftLog.committed + pr.Commit.Sent(commit) r.send(pb.Message{ To: to, Type: pb.MsgApp, - Index: lastIndex, - LogTerm: lastTerm, - Entries: ents, - Commit: r.raftLog.committed, + Index: prevIndex, + LogTerm: prevTerm, + Entries: entries, + Commit: commit, }) return true } +func (r *raft) sendSnapshot(to uint64, pr *tracker.Progress) bool { + if !pr.RecentActive { + r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) + return false + } + + snapshot, err := r.raftLog.snapshot() + if err != nil { + if err == ErrSnapshotTemporarilyUnavailable { + r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) + return false + } + panic(err) // TODO(bdarnell) + } + if IsEmptySnap(snapshot) { + panic("need non-empty snapshot") + } + sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term + r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", + r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) + pr.BecomeSnapshot(sindex) + r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) + + r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) + return true +} + // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // Attach the commit as min(to.matched, r.committed). @@ -687,11 +725,8 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.trk. func (r *raft) bcastAppend() { - r.trk.Visit(func(id uint64, _ *tracker.Progress) { - if id == r.id { - return - } - r.sendAppend(id) + r.trk.Visit(func(id uint64, pr *tracker.Progress) { + r.maybeSendAppend(id, pr) }) } @@ -773,8 +808,7 @@ func (r *raft) reset(term uint64) { r.trk.ResetVotes() r.trk.Visit(func(id uint64, pr *tracker.Progress) { *pr = tracker.Progress{ - Match: 0, - Next: r.raftLog.lastIndex() + 1, + Watermark: tracker.Watermark{Next: r.raftLog.lastIndex() + 1}, Inflights: tracker.NewInflights(r.trk.MaxInflight, r.trk.MaxInflightBytes), IsLearner: pr.IsLearner, } @@ -1462,11 +1496,10 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State == tracker.StateReplicate { pr.BecomeProbe() } - r.sendAppend(m.From) + r.maybeSendAppend(m.From, pr) } } else { - oldPaused := pr.IsPaused() - pr.UpdateCommit(m.Commit) + pr.Commit.Update(m.Commit) // We want to update our tracking if the response updates our // matched index or if the response can move a probing peer back // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt @@ -1478,6 +1511,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.MaybeUpdate(m.Index) || (pr.Match == m.Index && pr.State == tracker.StateProbe) { switch { case pr.State == tracker.StateProbe: + // r.logger.Info("become replicate", m.From) pr.BecomeReplicate() case pr.State == tracker.StateSnapshot && pr.Match+1 >= r.raftLog.firstIndex(): // Note that we don't take into account PendingSnapshot to @@ -1499,13 +1533,11 @@ func stepLeader(r *raft, m pb.Message) error { } if r.maybeCommit() { + // r.logger.Info("commit", m.From) // committed index has progressed for the term, so it is safe // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if oldPaused && r.id != m.From && pr.Commit < r.raftLog.committed { - // The node is potentially missing the latest commit index. Send it. - r.sendAppend(m.From) } // We've updated flow control information above, which may // allow us to send multiple (size-limited) in-flight messages @@ -1514,7 +1546,8 @@ func stepLeader(r *raft, m pb.Message) error { // we have more entries to send, send as many messages as we // can (without sending empty messages for the commit index) if r.id != m.From { - for r.maybeSendAppend(m.From, false /* sendIfEmpty */) { + // r.logger.Info("iterate", m.From) + for r.maybeSendAppend(m.From, pr) { } } // Transfer leadership is in progress. @@ -1526,7 +1559,8 @@ func stepLeader(r *raft, m pb.Message) error { } case pb.MsgHeartbeatResp: pr.RecentActive = true - pr.MsgAppFlowPaused = false + pr.Pause = false + pr.Commit.Pause = false // NB: if the follower is paused (full Inflights), this will still send an // empty append, allowing it to recover from situations in which all the @@ -1541,9 +1575,7 @@ func stepLeader(r *raft, m pb.Message) error { // Note that StateSnapshot typically satisfies pr.Match < lastIndex, but // `pr.Paused()` is always true for StateSnapshot, so sendAppend is a // no-op. - if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe { - r.sendAppend(m.From) - } + r.maybeSendAppend(m.From, pr) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return nil @@ -1576,7 +1608,7 @@ func stepLeader(r *raft, m pb.Message) error { // If snapshot finish, wait for the MsgAppResp from the remote node before sending // out the next MsgApp. // If snapshot failure, wait for a heartbeat interval before next try - pr.MsgAppFlowPaused = true + pr.Pause = true case pb.MsgUnreachable: // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. @@ -1613,7 +1645,7 @@ func stepLeader(r *raft, m pb.Message) error { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - r.sendAppend(leadTransferee) + r.maybeSendAppend(leadTransferee, pr) } } return nil @@ -1947,21 +1979,15 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co return cs } - if r.maybeCommit() { - // If the configuration change means that more entries are committed now, - // broadcast/append to everyone in the updated config. - r.bcastAppend() - } else { - // Otherwise, still probe the newly added replicas; there's no reason to - // let them wait out a heartbeat interval (or the next incoming - // proposal). - r.trk.Visit(func(id uint64, pr *tracker.Progress) { - if id == r.id { - return - } - r.maybeSendAppend(id, false /* sendIfEmpty */) - }) - } + r.maybeCommit() + // If the configuration change means that more entries are committed now, + // broadcast/append to everyone in the updated config. + // + // Otherwise, still probe the newly added replicas; there's no reason to + // let them wait out a heartbeat interval (or the next incoming + // proposal). + r.bcastAppend() + // If the leadTransferee was removed or demoted, abort the leadership transfer. if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 { r.abortLeaderTransfer() diff --git a/raft_snap_test.go b/raft_snap_test.go index e6058c68..6787d510 100644 --- a/raft_snap_test.go +++ b/raft_snap_test.go @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) { if sm.trk.Progress[2].Next != 1 { t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].Pause { + t.Errorf("Pause = %v, want true", sm.trk.Progress[2].Pause) } } @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) { if sm.trk.Progress[2].Next != 12 { t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].Pause { + t.Errorf("Pause = %v, want true", sm.trk.Progress[2].Pause) } } diff --git a/raft_test.go b/raft_test.go index 5bc6d52e..961ec0d5 100644 --- a/raft_test.go +++ b/raft_test.go @@ -128,21 +128,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].MsgAppFlowPaused = true + r.trk.Progress[2].Pause = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].Pause { + t.Errorf("paused = %v, want true", r.trk.Progress[2].Pause) } r.trk.Progress[2].BecomeReplicate() - if r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused) + if r.trk.Progress[2].Pause { + t.Errorf("paused = %v, want false", r.trk.Progress[2].Pause) } - r.trk.Progress[2].MsgAppFlowPaused = true + r.trk.Progress[2].Pause = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused) + if r.trk.Progress[2].Pause { + t.Errorf("paused = %v, want false", r.trk.Progress[2].Pause) } } @@ -2794,7 +2794,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeProbe() + pr2 := r.trk.Progress[2] + pr2.BecomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2803,7 +2804,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // loop. After that, the follower is paused until a heartbeat response is // received. mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2, pr2) msg := r.readMessages() if len(msg) != 1 { t.Errorf("len(msg) = %d, want %d", len(msg), 1) @@ -2813,12 +2814,12 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].Pause { + t.Errorf("paused = %v, want true", r.trk.Progress[2].Pause) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2, pr2) if l := len(r.readMessages()); l != 0 { t.Errorf("len(msg) = %d, want %d", l, 0) } @@ -2828,8 +2829,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].Pause { + t.Errorf("paused = %v, want true", r.trk.Progress[2].Pause) } // consume the heartbeat @@ -2851,8 +2852,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].Pause { + t.Errorf("paused = %v, want true", r.trk.Progress[2].Pause) } } @@ -2861,11 +2862,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeReplicate() + pr2 := r.trk.Progress[2] + pr2.BecomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2, pr2) msgs := r.readMessages() if len(msgs) != 1 { t.Errorf("len(msg) = %d, want %d", len(msgs), 1) @@ -2878,11 +2880,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeSnapshot(10) + pr2 := r.trk.Progress[2] + pr2.BecomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2, pr2) msgs := r.readMessages() if len(msgs) != 0 { t.Errorf("len(msg) = %d, want %d", len(msgs), 0) @@ -2901,7 +2904,7 @@ func TestRecvMsgUnreachable(t *testing.T) { // set node 2 to state replicate r.trk.Progress[2].Match = 3 r.trk.Progress[2].BecomeReplicate() - r.trk.Progress[2].OptimisticUpdate(5) + r.trk.Progress[2].Update(5) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable}) diff --git a/testdata/async_storage_writes.txt b/testdata/async_storage_writes.txt index 13fb015f..c216a715 100644 --- a/testdata/async_storage_writes.txt +++ b/testdata/async_storage_writes.txt @@ -327,6 +327,7 @@ process-ready 1 2 3 Messages: 1->2 MsgApp Term:1 Log:1/13 Commit:12 1->3 MsgApp Term:1 Log:1/13 Commit:12 + 1->3 MsgApp Term:1 Log:1/13 Commit:12 1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] 1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[1->1 MsgAppResp Term:1 Log:0/14 Commit:12, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14] @@ -341,6 +342,7 @@ deliver-msgs 1 2 3 1->2 MsgApp Term:1 Log:1/13 Commit:12 1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] 1->3 MsgApp Term:1 Log:1/13 Commit:12 +1->3 MsgApp Term:1 Log:1/13 Commit:12 1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] process-ready 1 2 3 @@ -365,7 +367,7 @@ process-ready 1 2 3 CommittedEntries: 1/12 EntryNormal "prop_1" Messages: - 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[3->1 MsgAppResp Term:1 Log:0/13 Commit:12, 3->1 MsgAppResp Term:1 Log:0/14 Commit:12, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14] + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[3->1 MsgAppResp Term:1 Log:0/13 Commit:12, 3->1 MsgAppResp Term:1 Log:0/13 Commit:12, 3->1 MsgAppResp Term:1 Log:0/14 Commit:12, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14] 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"]] process-append-thread 1 2 3 @@ -414,6 +416,7 @@ process-ready 1 2 3 Messages: 1->2 MsgApp Term:1 Log:1/14 Commit:13 1->3 MsgApp Term:1 Log:1/14 Commit:13 + 1->3 MsgApp Term:1 Log:1/14 Commit:13 1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] 1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[1->1 MsgAppResp Term:1 Log:0/15 Commit:13, AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15] @@ -428,6 +431,7 @@ deliver-msgs 1 2 3 1->2 MsgApp Term:1 Log:1/14 Commit:13 1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] 1->3 MsgApp Term:1 Log:1/14 Commit:13 +1->3 MsgApp Term:1 Log:1/14 Commit:13 1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] process-ready 1 2 3 @@ -452,7 +456,7 @@ process-ready 1 2 3 CommittedEntries: 1/13 EntryNormal "prop_2" Messages: - 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[3->1 MsgAppResp Term:1 Log:0/14 Commit:13, 3->1 MsgAppResp Term:1 Log:0/15 Commit:13, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15] + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[3->1 MsgAppResp Term:1 Log:0/14 Commit:13, 3->1 MsgAppResp Term:1 Log:0/14 Commit:13, 3->1 MsgAppResp Term:1 Log:0/15 Commit:13, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15] 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"]] process-append-thread 1 2 3 @@ -475,6 +479,7 @@ process-append-thread 1 2 3 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses: 3->1 MsgAppResp Term:1 Log:0/13 Commit:12 + 3->1 MsgAppResp Term:1 Log:0/13 Commit:12 3->1 MsgAppResp Term:1 Log:0/14 Commit:12 AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14 @@ -503,6 +508,7 @@ AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14 2->1 MsgAppResp Term:1 Log:0/13 Commit:12 2->1 MsgAppResp Term:1 Log:0/14 Commit:12 3->1 MsgAppResp Term:1 Log:0/13 Commit:12 +3->1 MsgAppResp Term:1 Log:0/13 Commit:12 3->1 MsgAppResp Term:1 Log:0/14 Commit:12 ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14 @@ -518,8 +524,12 @@ process-ready 1 2 3 CommittedEntries: 1/14 EntryNormal "prop_3" Messages: + 1->2 MsgApp Term:1 Log:1/15 Commit:13 1->2 MsgApp Term:1 Log:1/15 Commit:14 1->3 MsgApp Term:1 Log:1/15 Commit:14 + 1->3 MsgApp Term:1 Log:1/15 Commit:14 + 1->3 MsgApp Term:1 Log:1/15 Commit:14 + 1->3 MsgApp Term:1 Log:1/15 Commit:14 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses:[AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15] 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"]] > 2 handling Ready @@ -529,8 +539,12 @@ process-ready 1 2 3 deliver-msgs 1 2 3 ---- +1->2 MsgApp Term:1 Log:1/15 Commit:13 1->2 MsgApp Term:1 Log:1/15 Commit:14 1->3 MsgApp Term:1 Log:1/15 Commit:14 +1->3 MsgApp Term:1 Log:1/15 Commit:14 +1->3 MsgApp Term:1 Log:1/15 Commit:14 +1->3 MsgApp Term:1 Log:1/15 Commit:14 process-ready 1 2 3 ---- @@ -542,7 +556,7 @@ process-ready 1 2 3 CommittedEntries: 1/14 EntryNormal "prop_3" Messages: - 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses:[2->1 MsgAppResp Term:1 Log:0/15 Commit:14, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15] + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses:[2->1 MsgAppResp Term:1 Log:0/15 Commit:13, 2->1 MsgAppResp Term:1 Log:0/15 Commit:14, AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15] 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"]] > 3 handling Ready Ready MustSync=false: @@ -550,7 +564,7 @@ process-ready 1 2 3 CommittedEntries: 1/14 EntryNormal "prop_3" Messages: - 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses:[3->1 MsgAppResp Term:1 Log:0/15 Commit:14, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15] + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses:[3->1 MsgAppResp Term:1 Log:0/15 Commit:14, 3->1 MsgAppResp Term:1 Log:0/15 Commit:14, 3->1 MsgAppResp Term:1 Log:0/15 Commit:14, 3->1 MsgAppResp Term:1 Log:0/15 Commit:14, AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15] 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"]] process-append-thread 1 2 3 @@ -573,6 +587,7 @@ process-append-thread 1 2 3 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses: 3->1 MsgAppResp Term:1 Log:0/14 Commit:13 + 3->1 MsgAppResp Term:1 Log:0/14 Commit:13 3->1 MsgAppResp Term:1 Log:0/15 Commit:13 AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 @@ -601,6 +616,7 @@ AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 2->1 MsgAppResp Term:1 Log:0/14 Commit:13 2->1 MsgAppResp Term:1 Log:0/15 Commit:13 3->1 MsgAppResp Term:1 Log:0/14 Commit:13 +3->1 MsgAppResp Term:1 Log:0/14 Commit:13 3->1 MsgAppResp Term:1 Log:0/15 Commit:13 ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 @@ -616,8 +632,12 @@ process-ready 1 2 3 CommittedEntries: 1/15 EntryNormal "prop_4" Messages: + 1->2 MsgApp Term:1 Log:1/15 Commit:14 1->2 MsgApp Term:1 Log:1/15 Commit:15 1->3 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"]] > 2 handling Ready @@ -627,8 +647,12 @@ process-ready 1 2 3 deliver-msgs 1 2 3 ---- +1->2 MsgApp Term:1 Log:1/15 Commit:14 1->2 MsgApp Term:1 Log:1/15 Commit:15 1->3 MsgApp Term:1 Log:1/15 Commit:15 +1->3 MsgApp Term:1 Log:1/15 Commit:15 +1->3 MsgApp Term:1 Log:1/15 Commit:15 +1->3 MsgApp Term:1 Log:1/15 Commit:15 process-ready 1 2 3 ---- @@ -640,7 +664,7 @@ process-ready 1 2 3 CommittedEntries: 1/15 EntryNormal "prop_4" Messages: - 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 Responses:[2->1 MsgAppResp Term:1 Log:0/15 Commit:15] + 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 Responses:[2->1 MsgAppResp Term:1 Log:0/15 Commit:14, 2->1 MsgAppResp Term:1 Log:0/15 Commit:15] 2->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"]] > 3 handling Ready Ready MustSync=false: @@ -648,7 +672,7 @@ process-ready 1 2 3 CommittedEntries: 1/15 EntryNormal "prop_4" Messages: - 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 Responses:[3->1 MsgAppResp Term:1 Log:0/15 Commit:15] + 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 Responses:[3->1 MsgAppResp Term:1 Log:0/15 Commit:15, 3->1 MsgAppResp Term:1 Log:0/15 Commit:15, 3->1 MsgAppResp Term:1 Log:0/15 Commit:15, 3->1 MsgAppResp Term:1 Log:0/15 Commit:15] 3->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses:[ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"]] process-append-thread 2 3 @@ -657,6 +681,7 @@ process-append-thread 2 3 Processing: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses: + 2->1 MsgAppResp Term:1 Log:0/15 Commit:13 2->1 MsgAppResp Term:1 Log:0/15 Commit:14 AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 > 3 processing append thread @@ -664,6 +689,9 @@ process-append-thread 2 3 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses: 3->1 MsgAppResp Term:1 Log:0/15 Commit:14 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:14 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:14 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:14 AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 process-apply-thread 1 2 3 @@ -686,8 +714,12 @@ process-apply-thread 1 2 3 deliver-msgs 1 2 3 ---- +2->1 MsgAppResp Term:1 Log:0/15 Commit:13 2->1 MsgAppResp Term:1 Log:0/15 Commit:14 3->1 MsgAppResp Term:1 Log:0/15 Commit:14 +3->1 MsgAppResp Term:1 Log:0/15 Commit:14 +3->1 MsgAppResp Term:1 Log:0/15 Commit:14 +3->1 MsgAppResp Term:1 Log:0/15 Commit:14 ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 INFO entry at index 15 missing from unstable log; ignoring @@ -699,7 +731,14 @@ ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "pro process-ready 1 2 3 ---- > 1 handling Ready - + Ready MustSync=false: + Messages: + 1->2 MsgApp Term:1 Log:1/15 Commit:15 + 1->2 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 > 2 handling Ready > 3 handling Ready @@ -711,12 +750,16 @@ process-append-thread 2 3 Processing: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 Responses: + 2->1 MsgAppResp Term:1 Log:0/15 Commit:14 2->1 MsgAppResp Term:1 Log:0/15 Commit:15 > 3 processing append thread Processing: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 Responses: 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 process-apply-thread 1 2 3 ---- @@ -738,23 +781,41 @@ process-apply-thread 1 2 3 deliver-msgs 1 2 3 ---- +2->1 MsgAppResp Term:1 Log:0/15 Commit:14 2->1 MsgAppResp Term:1 Log:0/15 Commit:15 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 +3->1 MsgAppResp Term:1 Log:0/15 Commit:15 +3->1 MsgAppResp Term:1 Log:0/15 Commit:15 +3->1 MsgAppResp Term:1 Log:0/15 Commit:15 ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] +1->2 MsgApp Term:1 Log:1/15 Commit:15 +1->2 MsgApp Term:1 Log:1/15 Commit:15 ApplyThread->2 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] +1->3 MsgApp Term:1 Log:1/15 Commit:15 +1->3 MsgApp Term:1 Log:1/15 Commit:15 +1->3 MsgApp Term:1 Log:1/15 Commit:15 +1->3 MsgApp Term:1 Log:1/15 Commit:15 ApplyThread->3 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] process-ready 1 2 3 ---- > 1 handling Ready - + Ready MustSync=false: + Messages: + 1->2 MsgApp Term:1 Log:1/15 Commit:15 > 2 handling Ready - + Ready MustSync=false: + Messages: + 2->AppendThread MsgStorageAppend Term:0 Log:0/0 Responses:[2->1 MsgAppResp Term:1 Log:0/15 Commit:15, 2->1 MsgAppResp Term:1 Log:0/15 Commit:15] > 3 handling Ready - + Ready MustSync=false: + Messages: + 3->AppendThread MsgStorageAppend Term:0 Log:0/0 Responses:[3->1 MsgAppResp Term:1 Log:0/15 Commit:15, 3->1 MsgAppResp Term:1 Log:0/15 Commit:15, 3->1 MsgAppResp Term:1 Log:0/15 Commit:15, 3->1 MsgAppResp Term:1 Log:0/15 Commit:15] stabilize ---- +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/15 Commit:15 > 1 processing append thread Processing: 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 @@ -763,6 +824,37 @@ stabilize Processing: 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 Responses: +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:0 Log:0/0 + Responses: + 2->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 2->1 MsgAppResp Term:1 Log:0/15 Commit:15 +> 3 processing append thread + Processing: + 3->AppendThread MsgStorageAppend Term:0 Log:0/0 + Responses: + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->AppendThread MsgStorageAppend Term:0 Log:0/0 Responses:[2->1 MsgAppResp Term:1 Log:0/15 Commit:15] > 1 receiving messages AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 INFO entry at index 15 missing from unstable log; ignoring + 2->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 2->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 + 3->1 MsgAppResp Term:1 Log:0/15 Commit:15 +> 2 processing append thread + Processing: + 2->AppendThread MsgStorageAppend Term:0 Log:0/0 + Responses: + 2->1 MsgAppResp Term:1 Log:0/15 Commit:15 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/15 Commit:15 diff --git a/testdata/confchange_v1_remove_leader.txt b/testdata/confchange_v1_remove_leader.txt index fde99c82..27fe62ac 100644 --- a/testdata/confchange_v1_remove_leader.txt +++ b/testdata/confchange_v1_remove_leader.txt @@ -1,5 +1,5 @@ # We'll turn this back on after the boilerplate. -log-level none +log-level debug ---- ok @@ -7,15 +7,118 @@ ok # Bootstrap n1, n2, n3. add-nodes 3 voters=(1,2,3) index=2 ---- -ok +INFO 1 switched to configuration voters=(1 2 3) +INFO 1 became follower at term 0 +INFO newRaft 1 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] +INFO 2 switched to configuration voters=(1 2 3) +INFO 2 became follower at term 0 +INFO newRaft 2 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] +INFO 3 switched to configuration voters=(1 2 3) +INFO 3 became follower at term 0 +INFO newRaft 3 [peers: [1,2,3], term: 0, commit: 2, applied: 2, lastindex: 2, lastterm: 1] campaign 1 ---- -ok +INFO 1 is starting a new election at term 0 +INFO 1 became candidate at term 1 +INFO 1 [logterm: 1, index: 2] sent MsgVote request to 2 at term 1 +INFO 1 [logterm: 1, index: 2] sent MsgVote request to 3 at term 1 stabilize ---- -ok +> 1 handling Ready + Ready MustSync=true: + Lead:0 State:StateCandidate + HardState Term:1 Vote:1 Commit:2 + Messages: + 1->2 MsgVote Term:1 Log:1/2 + 1->3 MsgVote Term:1 Log:1/2 + INFO 1 received MsgVoteResp from 1 at term 1 + INFO 1 has received 1 MsgVoteResp votes and 0 vote rejections +> 2 receiving messages + 1->2 MsgVote Term:1 Log:1/2 + INFO 2 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 2 became follower at term 1 + INFO 2 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1 +> 3 receiving messages + 1->3 MsgVote Term:1 Log:1/2 + INFO 3 [term: 0] received a MsgVote message with higher term from 1 [term: 1] + INFO 3 became follower at term 1 + INFO 3 [logterm: 1, index: 2, vote: 0] cast MsgVote for 1 [logterm: 1, index: 2] at term 1 +> 2 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:2 + Messages: + 2->1 MsgVoteResp Term:1 Log:0/0 +> 3 handling Ready + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:2 + Messages: + 3->1 MsgVoteResp Term:1 Log:0/0 +> 1 receiving messages + 2->1 MsgVoteResp Term:1 Log:0/0 + INFO 1 received MsgVoteResp from 2 at term 1 + INFO 1 has received 2 MsgVoteResp votes and 0 vote rejections + INFO 1 became leader at term 1 + 3->1 MsgVoteResp Term:1 Log:0/0 +> 1 handling Ready + Ready MustSync=true: + Lead:1 State:StateLeader + Entries: + 1/3 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] + 1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/2 Commit:2 Entries:[1/3 EntryNormal ""] +> 2 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + Entries: + 1/3 EntryNormal "" + Messages: + 2->1 MsgAppResp Term:1 Log:0/3 Commit:2 +> 3 handling Ready + Ready MustSync=true: + Lead:1 State:StateFollower + Entries: + 1/3 EntryNormal "" + Messages: + 3->1 MsgAppResp Term:1 Log:0/3 Commit:2 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/3 Commit:2 + 3->1 MsgAppResp Term:1 Log:0/3 Commit:2 +> 1 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 1->2 MsgApp Term:1 Log:1/3 Commit:3 + 1->3 MsgApp Term:1 Log:1/3 Commit:3 +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/3 Commit:3 +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/3 Commit:3 +> 2 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 2->1 MsgAppResp Term:1 Log:0/3 Commit:3 +> 3 handling Ready + Ready MustSync=false: + HardState Term:1 Vote:1 Commit:3 + CommittedEntries: + 1/3 EntryNormal "" + Messages: + 3->1 MsgAppResp Term:1 Log:0/3 Commit:3 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/3 Commit:3 + 3->1 MsgAppResp Term:1 Log:0/3 Commit:3 log-level debug ---- @@ -27,6 +130,12 @@ raft-state 2: StateFollower (Voter) Term:1 Lead:1 3: StateFollower (Voter) Term:1 Lead:1 +status 1 +---- +1: StateReplicate match=3 commit=2 next=4 +2: StateReplicate match=3 commit=3 next=4 +3: StateReplicate match=3 commit=3 next=4 + # Start removing n1. propose-conf-change 1 v1=true r1 @@ -107,6 +216,11 @@ stabilize 1 1->3 MsgApp Term:1 Log:1/6 Commit:5 INFO 1 switched to configuration voters=(2 3) +status 1 +---- +2: StateReplicate match=5 commit=3 next=7 inflight=1 +3: StateReplicate match=3 commit=3 next=7 paused inflight=3 + raft-state ---- 1: StateLeader (Non-Voter) Term:1 Lead:1 @@ -141,16 +255,29 @@ stabilize 1 2->1 MsgAppResp Term:1 Log:0/6 Commit:3 2->1 MsgAppResp Term:1 Log:0/6 Commit:4 2->1 MsgAppResp Term:1 Log:0/6 Commit:5 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgApp Term:1 Log:1/6 Commit:5 + 1->2 MsgApp Term:1 Log:1/6 Commit:5 # When n3 responds, quorum is reached and everything falls into place. stabilize ---- +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/6 Commit:5 + 1->2 MsgApp Term:1 Log:1/6 Commit:5 > 3 receiving messages 1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] 1->3 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] 1->3 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] 1->3 MsgApp Term:1 Log:1/6 Commit:4 1->3 MsgApp Term:1 Log:1/6 Commit:5 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgAppResp Term:1 Log:0/6 Commit:5 + 2->1 MsgAppResp Term:1 Log:0/6 Commit:5 > 3 handling Ready Ready MustSync=true: HardState Term:1 Vote:1 Commit:5 @@ -169,6 +296,8 @@ stabilize 3->1 MsgAppResp Term:1 Log:0/6 Commit:5 INFO 3 switched to configuration voters=(2 3) > 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/6 Commit:5 + 2->1 MsgAppResp Term:1 Log:0/6 Commit:5 3->1 MsgAppResp Term:1 Log:0/4 Commit:3 3->1 MsgAppResp Term:1 Log:0/5 Commit:3 3->1 MsgAppResp Term:1 Log:0/6 Commit:3 @@ -180,11 +309,19 @@ stabilize CommittedEntries: 1/6 EntryNormal "bar" Messages: + 1->3 MsgApp Term:1 Log:1/6 Commit:5 + 1->3 MsgApp Term:1 Log:1/6 Commit:5 1->2 MsgApp Term:1 Log:1/6 Commit:6 1->3 MsgApp Term:1 Log:1/6 Commit:6 + 1->3 MsgApp Term:1 Log:1/6 Commit:6 + 1->3 MsgApp Term:1 Log:1/6 Commit:6 > 2 receiving messages 1->2 MsgApp Term:1 Log:1/6 Commit:6 > 3 receiving messages + 1->3 MsgApp Term:1 Log:1/6 Commit:5 + 1->3 MsgApp Term:1 Log:1/6 Commit:5 + 1->3 MsgApp Term:1 Log:1/6 Commit:6 + 1->3 MsgApp Term:1 Log:1/6 Commit:6 1->3 MsgApp Term:1 Log:1/6 Commit:6 > 2 handling Ready Ready MustSync=false: @@ -199,9 +336,33 @@ stabilize CommittedEntries: 1/6 EntryNormal "bar" Messages: + 3->1 MsgAppResp Term:1 Log:0/6 Commit:5 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:5 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/6 Commit:6 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:5 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:5 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/6 Commit:6 + 1->3 MsgApp Term:1 Log:1/6 Commit:6 +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/6 Commit:6 + 1->3 MsgApp Term:1 Log:1/6 Commit:6 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 + 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 3->1 MsgAppResp Term:1 Log:0/6 Commit:6 # However not all is well. n1 is still leader but unconditionally drops all diff --git a/testdata/confchange_v2_add_double_auto.txt b/testdata/confchange_v2_add_double_auto.txt index d89b9095..4875576b 100644 --- a/testdata/confchange_v2_add_double_auto.txt +++ b/testdata/confchange_v2_add_double_auto.txt @@ -119,16 +119,20 @@ stabilize 1 2 Ready MustSync=false: Messages: 1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[1/5 EntryConfChangeV2] + 1->2 MsgApp Term:1 Log:1/5 Commit:4 > 2 receiving messages 1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[1/5 EntryConfChangeV2] + 1->2 MsgApp Term:1 Log:1/5 Commit:4 > 2 handling Ready Ready MustSync=true: Entries: 1/5 EntryConfChangeV2 Messages: 2->1 MsgAppResp Term:1 Log:0/5 Commit:4 + 2->1 MsgAppResp Term:1 Log:0/5 Commit:4 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/5 Commit:4 + 2->1 MsgAppResp Term:1 Log:0/5 Commit:4 > 1 handling Ready Ready MustSync=false: HardState Term:1 Vote:1 Commit:5 @@ -136,9 +140,11 @@ stabilize 1 2 1/5 EntryConfChangeV2 Messages: 1->2 MsgApp Term:1 Log:1/5 Commit:5 + 1->2 MsgApp Term:1 Log:1/5 Commit:5 INFO 1 switched to configuration voters=(1 2 3) > 2 receiving messages 1->2 MsgApp Term:1 Log:1/5 Commit:5 + 1->2 MsgApp Term:1 Log:1/5 Commit:5 > 2 handling Ready Ready MustSync=false: HardState Term:1 Commit:5 @@ -146,9 +152,11 @@ stabilize 1 2 1/5 EntryConfChangeV2 Messages: 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 + 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 INFO 2 switched to configuration voters=(1 2 3) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 + 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 # n3 immediately receives a snapshot in the final configuration. stabilize 1 3 @@ -282,6 +290,7 @@ stabilize 1 Messages: 1->2 MsgApp Term:1 Log:1/8 Commit:6 1->3 MsgApp Term:1 Log:1/8 Commit:6 + 1->3 MsgApp Term:1 Log:1/8 Commit:6 INFO 1 switched to configuration voters=(1)&&(1 2 3) autoleave INFO initiating automatic transition out of joint configuration voters=(1)&&(1 2 3) autoleave > 1 handling Ready @@ -304,6 +313,7 @@ stabilize 2 3 1->3 MsgApp Term:1 Log:1/6 Commit:5 Entries:[1/7 EntryNormal "foo"] 1->3 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"] 1->3 MsgApp Term:1 Log:1/8 Commit:6 + 1->3 MsgApp Term:1 Log:1/8 Commit:6 1->3 MsgApp Term:1 Log:1/8 Commit:6 Entries:[1/9 EntryConfChangeV2] > 2 handling Ready Ready MustSync=true: @@ -333,6 +343,7 @@ stabilize 2 3 3->1 MsgAppResp Term:1 Log:0/7 Commit:5 3->1 MsgAppResp Term:1 Log:0/8 Commit:5 3->1 MsgAppResp Term:1 Log:0/8 Commit:6 + 3->1 MsgAppResp Term:1 Log:0/8 Commit:6 3->1 MsgAppResp Term:1 Log:0/9 Commit:6 INFO 3 switched to configuration voters=(1)&&(1 2 3) autoleave @@ -349,6 +360,7 @@ stabilize 3->1 MsgAppResp Term:1 Log:0/7 Commit:5 3->1 MsgAppResp Term:1 Log:0/8 Commit:5 3->1 MsgAppResp Term:1 Log:0/8 Commit:6 + 3->1 MsgAppResp Term:1 Log:0/8 Commit:6 3->1 MsgAppResp Term:1 Log:0/9 Commit:6 > 1 handling Ready Ready MustSync=false: @@ -361,17 +373,25 @@ stabilize 1->2 MsgApp Term:1 Log:1/9 Commit:7 1->3 MsgApp Term:1 Log:1/9 Commit:7 1->2 MsgApp Term:1 Log:1/9 Commit:8 - 1->3 MsgApp Term:1 Log:1/9 Commit:8 + 1->2 MsgApp Term:1 Log:1/9 Commit:8 1->2 MsgApp Term:1 Log:1/9 Commit:9 1->3 MsgApp Term:1 Log:1/9 Commit:9 + 1->3 MsgApp Term:1 Log:1/9 Commit:9 + 1->3 MsgApp Term:1 Log:1/9 Commit:9 + 1->3 MsgApp Term:1 Log:1/9 Commit:9 + 1->3 MsgApp Term:1 Log:1/9 Commit:9 INFO 1 switched to configuration voters=(1) > 2 receiving messages 1->2 MsgApp Term:1 Log:1/9 Commit:7 1->2 MsgApp Term:1 Log:1/9 Commit:8 + 1->2 MsgApp Term:1 Log:1/9 Commit:8 1->2 MsgApp Term:1 Log:1/9 Commit:9 > 3 receiving messages 1->3 MsgApp Term:1 Log:1/9 Commit:7 - 1->3 MsgApp Term:1 Log:1/9 Commit:8 + 1->3 MsgApp Term:1 Log:1/9 Commit:9 + 1->3 MsgApp Term:1 Log:1/9 Commit:9 + 1->3 MsgApp Term:1 Log:1/9 Commit:9 + 1->3 MsgApp Term:1 Log:1/9 Commit:9 1->3 MsgApp Term:1 Log:1/9 Commit:9 > 2 handling Ready Ready MustSync=false: @@ -383,6 +403,7 @@ stabilize Messages: 2->1 MsgAppResp Term:1 Log:0/9 Commit:7 2->1 MsgAppResp Term:1 Log:0/9 Commit:8 + 2->1 MsgAppResp Term:1 Log:0/9 Commit:8 2->1 MsgAppResp Term:1 Log:0/9 Commit:9 INFO 2 switched to configuration voters=(1) > 3 handling Ready @@ -394,7 +415,10 @@ stabilize 1/9 EntryConfChangeV2 Messages: 3->1 MsgAppResp Term:1 Log:0/9 Commit:7 - 3->1 MsgAppResp Term:1 Log:0/9 Commit:8 + 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 + 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 + 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 + 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 INFO 3 switched to configuration voters=(1) > 1 receiving messages @@ -402,11 +426,19 @@ stabilize raft: cannot step as peer not found 2->1 MsgAppResp Term:1 Log:0/9 Commit:8 raft: cannot step as peer not found + 2->1 MsgAppResp Term:1 Log:0/9 Commit:8 + raft: cannot step as peer not found 2->1 MsgAppResp Term:1 Log:0/9 Commit:9 raft: cannot step as peer not found 3->1 MsgAppResp Term:1 Log:0/9 Commit:7 raft: cannot step as peer not found - 3->1 MsgAppResp Term:1 Log:0/9 Commit:8 + 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 + raft: cannot step as peer not found + 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 + raft: cannot step as peer not found + 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 + raft: cannot step as peer not found + 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 raft: cannot step as peer not found 3->1 MsgAppResp Term:1 Log:0/9 Commit:9 raft: cannot step as peer not found diff --git a/testdata/confchange_v2_add_double_implicit.txt b/testdata/confchange_v2_add_double_implicit.txt index cf6a77fe..505df582 100644 --- a/testdata/confchange_v2_add_double_implicit.txt +++ b/testdata/confchange_v2_add_double_implicit.txt @@ -102,16 +102,20 @@ stabilize 1 2 Ready MustSync=false: Messages: 1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[1/5 EntryConfChangeV2] + 1->2 MsgApp Term:1 Log:1/5 Commit:4 > 2 receiving messages 1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[1/5 EntryConfChangeV2] + 1->2 MsgApp Term:1 Log:1/5 Commit:4 > 2 handling Ready Ready MustSync=true: Entries: 1/5 EntryConfChangeV2 Messages: 2->1 MsgAppResp Term:1 Log:0/5 Commit:4 + 2->1 MsgAppResp Term:1 Log:0/5 Commit:4 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/5 Commit:4 + 2->1 MsgAppResp Term:1 Log:0/5 Commit:4 > 1 handling Ready Ready MustSync=false: HardState Term:1 Vote:1 Commit:5 @@ -119,9 +123,11 @@ stabilize 1 2 1/5 EntryConfChangeV2 Messages: 1->2 MsgApp Term:1 Log:1/5 Commit:5 + 1->2 MsgApp Term:1 Log:1/5 Commit:5 INFO 1 switched to configuration voters=(1 2) > 2 receiving messages 1->2 MsgApp Term:1 Log:1/5 Commit:5 + 1->2 MsgApp Term:1 Log:1/5 Commit:5 > 2 handling Ready Ready MustSync=false: HardState Term:1 Commit:5 @@ -129,6 +135,8 @@ stabilize 1 2 1/5 EntryConfChangeV2 Messages: 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 + 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 INFO 2 switched to configuration voters=(1 2) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 + 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 diff --git a/testdata/confchange_v2_add_single_explicit.txt b/testdata/confchange_v2_add_single_explicit.txt index 11ff48ed..d196e80a 100644 --- a/testdata/confchange_v2_add_single_explicit.txt +++ b/testdata/confchange_v2_add_single_explicit.txt @@ -170,6 +170,18 @@ stabilize > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/6 Commit:5 2->1 MsgAppResp Term:1 Log:0/6 Commit:6 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->2 MsgApp Term:1 Log:1/6 Commit:6 +> 2 receiving messages + 1->2 MsgApp Term:1 Log:1/6 Commit:6 +> 2 handling Ready + Ready MustSync=false: + Messages: + 2->1 MsgAppResp Term:1 Log:0/6 Commit:6 +> 1 receiving messages + 2->1 MsgAppResp Term:1 Log:0/6 Commit:6 # Check that trying to transition out again won't do anything. propose-conf-change 1 diff --git a/testdata/confchange_v2_replace_leader.txt b/testdata/confchange_v2_replace_leader.txt index 4ddde33c..94a665eb 100644 --- a/testdata/confchange_v2_replace_leader.txt +++ b/testdata/confchange_v2_replace_leader.txt @@ -83,11 +83,13 @@ stabilize Messages: 1->2 MsgApp Term:1 Log:1/4 Commit:4 1->3 MsgApp Term:1 Log:1/4 Commit:4 + 1->3 MsgApp Term:1 Log:1/4 Commit:4 INFO 1 switched to configuration voters=(2 3 4)&&(1 2 3) > 2 receiving messages 1->2 MsgApp Term:1 Log:1/4 Commit:4 > 3 receiving messages 1->3 MsgApp Term:1 Log:1/4 Commit:4 + 1->3 MsgApp Term:1 Log:1/4 Commit:4 > 1 handling Ready Ready MustSync=false: Messages: @@ -107,10 +109,12 @@ stabilize 1/4 EntryConfChangeV2 r1 v4 Messages: 3->1 MsgAppResp Term:1 Log:0/4 Commit:4 + 3->1 MsgAppResp Term:1 Log:0/4 Commit:4 INFO 3 switched to configuration voters=(2 3 4)&&(1 2 3) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/4 Commit:4 3->1 MsgAppResp Term:1 Log:0/4 Commit:4 + 3->1 MsgAppResp Term:1 Log:0/4 Commit:4 > 4 receiving messages 1->4 MsgApp Term:1 Log:1/3 Commit:4 Entries:[1/4 EntryConfChangeV2 r1 v4] INFO 4 [term: 0] received a MsgApp message with higher term from 1 [term: 1] @@ -378,6 +382,7 @@ stabilize 4->1 MsgApp Term:2 Log:2/6 Commit:6 4->2 MsgApp Term:2 Log:2/6 Commit:6 4->3 MsgApp Term:2 Log:2/6 Commit:6 + 4->3 MsgApp Term:2 Log:2/6 Commit:6 INFO 4 switched to configuration voters=(2 3 4) > 1 receiving messages 4->1 MsgApp Term:2 Log:2/6 Commit:6 @@ -385,6 +390,7 @@ stabilize 4->2 MsgApp Term:2 Log:2/6 Commit:6 > 3 receiving messages 4->3 MsgApp Term:2 Log:2/6 Commit:6 + 4->3 MsgApp Term:2 Log:2/6 Commit:6 > 1 handling Ready Ready MustSync=false: HardState Term:2 Vote:4 Commit:6 @@ -408,12 +414,14 @@ stabilize 2/6 EntryConfChangeV2 Messages: 3->4 MsgAppResp Term:2 Log:0/6 Commit:6 + 3->4 MsgAppResp Term:2 Log:0/6 Commit:6 INFO 3 switched to configuration voters=(2 3 4) > 4 receiving messages 1->4 MsgAppResp Term:2 Log:0/6 Commit:6 raft: cannot step as peer not found 2->4 MsgAppResp Term:2 Log:0/6 Commit:6 3->4 MsgAppResp Term:2 Log:0/6 Commit:6 + 3->4 MsgAppResp Term:2 Log:0/6 Commit:6 # n1 is out of the configuration. raft-state diff --git a/testdata/confchange_v2_replace_leader_stepdown.txt b/testdata/confchange_v2_replace_leader_stepdown.txt index ecad1d7c..6d785872 100644 --- a/testdata/confchange_v2_replace_leader_stepdown.txt +++ b/testdata/confchange_v2_replace_leader_stepdown.txt @@ -119,6 +119,7 @@ stabilize 1->2 MsgApp Term:1 Log:1/5 Commit:5 1->3 MsgApp Term:1 Log:1/5 Commit:5 1->4 MsgApp Term:1 Log:1/5 Commit:5 + 1->4 MsgApp Term:1 Log:1/5 Commit:5 INFO 1 switched to configuration voters=(2 3 4) INFO 1 became follower at term 1 > 2 receiving messages @@ -127,6 +128,7 @@ stabilize 1->3 MsgApp Term:1 Log:1/5 Commit:5 > 4 receiving messages 1->4 MsgApp Term:1 Log:1/5 Commit:5 + 1->4 MsgApp Term:1 Log:1/5 Commit:5 > 1 handling Ready Ready MustSync=false: Lead:0 State:StateFollower @@ -153,11 +155,13 @@ stabilize 1/5 EntryConfChangeV2 Messages: 4->1 MsgAppResp Term:1 Log:0/5 Commit:5 + 4->1 MsgAppResp Term:1 Log:0/5 Commit:5 INFO 4 switched to configuration voters=(2 3 4) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/5 Commit:5 3->1 MsgAppResp Term:1 Log:0/5 Commit:5 4->1 MsgAppResp Term:1 Log:0/5 Commit:5 + 4->1 MsgAppResp Term:1 Log:0/5 Commit:5 # n1 is out of the configuration. raft-state diff --git a/testdata/prevote.txt b/testdata/prevote.txt index f4926d43..1da48b1d 100644 --- a/testdata/prevote.txt +++ b/testdata/prevote.txt @@ -127,6 +127,18 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/12 Commit:12 3->1 MsgAppResp Term:1 Log:0/12 Commit:11 3->1 MsgAppResp Term:1 Log:0/12 Commit:12 +> 1 handling Ready + Ready MustSync=false: + Messages: + 1->3 MsgApp Term:1 Log:1/12 Commit:12 +> 3 receiving messages + 1->3 MsgApp Term:1 Log:1/12 Commit:12 +> 3 handling Ready + Ready MustSync=false: + Messages: + 3->1 MsgAppResp Term:1 Log:0/12 Commit:12 +> 1 receiving messages + 3->1 MsgAppResp Term:1 Log:0/12 Commit:12 # Let 2 campaign. It should succeed, since it's up-to-date on the log. campaign 2 diff --git a/testdata/replicate_pause.txt b/testdata/replicate_pause.txt index b9a359bf..f661a32f 100644 --- a/testdata/replicate_pause.txt +++ b/testdata/replicate_pause.txt @@ -47,8 +47,8 @@ ok status 1 ---- 1: StateReplicate match=14 commit=11 next=15 -2: StateReplicate match=11 commit=11 next=15 paused inflight=3[full] -3: StateReplicate match=11 commit=11 next=15 paused inflight=3[full] +2: StateReplicate match=11 commit=11 next=15 inflight=3[full] +3: StateReplicate match=11 commit=11 next=15 inflight=3[full] log-level none ---- @@ -76,6 +76,8 @@ deliver-msgs drop=3 dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13 # Repeat committing 3 entries. @@ -95,7 +97,7 @@ ok status 1 ---- 1: StateReplicate match=14 commit=11 next=15 -2: StateReplicate match=14 commit=14 next=18 paused inflight=3[full] +2: StateReplicate match=14 commit=14 next=18 inflight=3[full] 3: StateReplicate match=11 commit=11 next=15 paused inflight=3[full] log-level none diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt index d23820ae..ebbf3f5c 100644 --- a/testdata/slow_follower_after_compaction.txt +++ b/testdata/slow_follower_after_compaction.txt @@ -88,6 +88,8 @@ deliver-msgs drop=3 ---- dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"] dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"] +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:14 +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15 # Truncate the leader's log beyond node 3 log size. compact 1 17 diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt index ccf5ee5a..5d2ace58 100644 --- a/testdata/snapshot_succeed_via_app_resp_behind.txt +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -153,6 +153,7 @@ stabilize 1 Ready MustSync=false: Messages: 1->3 MsgApp Term:1 Log:1/11 Commit:12 Entries:[1/12 EntryNormal "\"foo\""] + 1->3 MsgApp Term:1 Log:1/12 Commit:12 # 3 is in StateReplicate thanks to receiving the snapshot at index 11. # This is despite its PendingSnapshot having been 12. diff --git a/tracker/progress.go b/tracker/progress.go index 38985c80..1308316f 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -28,15 +28,9 @@ import ( // strewn around `*raft.raft`. Additionally, some fields are only used when in a // certain State. All of this isn't ideal. type Progress struct { - // Match is the log index up to which the follower's log matches the leader's. - Match uint64 - // Commit is the commit index of the follower's log. - // INVARIANT: Commit <= Match. - Commit uint64 - // Next is the index of the next log entry to be sent to the follower. Entries - // in the [Match+1, Next-1] range, if any, are on the fly to the follower and - // have not been rejected yet. - Next uint64 + Watermark // the follower's last log index + + Commit Watermark // the follower's commit index // State defines how the leader should interact with the follower. // @@ -80,14 +74,6 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This - // happens in StateProbe, or StateReplicate with saturated Inflights. In both - // cases, we need to continue sending MsgApp once in a while to guarantee - // progress, but we only do so when MsgAppFlowPaused is false (it is reset on - // receiving a heartbeat response), to not overflow the receiver. See - // IsPaused(). - MsgAppFlowPaused bool - // Inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. // The max number of entries per message is defined in raft config as MaxSizePerMsg. @@ -106,10 +92,11 @@ type Progress struct { IsLearner bool } -// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, +// ResetState moves the Progress into the specified State, resetting Pause, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { - pr.MsgAppFlowPaused = false + pr.Pause = false + pr.Commit.Pause = false pr.PendingSnapshot = 0 pr.State = state pr.Inflights.reset() @@ -161,55 +148,21 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { // UpdateOnEntriesSend updates the progress on the given number of consecutive // entries being sent in a MsgApp, with the given total bytes size, appended at // and after the given log index. -func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error { - switch pr.State { - case StateReplicate: - if entries > 0 { - last := nextIndex + uint64(entries) - 1 - pr.OptimisticUpdate(last) - pr.Inflights.Add(last, bytes) - } - // If this message overflows the in-flights tracker, or it was already full, - // consider this message being a probe, so that the flow is paused. - pr.MsgAppFlowPaused = pr.Inflights.Full() - case StateProbe: - // TODO(pavelkalinnikov): this condition captures the previous behaviour, - // but we should set MsgAppFlowPaused unconditionally for simplicity, because any - // MsgApp in StateProbe is a probe, not only non-empty ones. - if entries > 0 { - pr.MsgAppFlowPaused = true - } - default: - return fmt.Errorf("sending append in unhandled state %s", pr.State) - } - return nil -} - -// UpdateCommit moves the known commit index for this follower forward. -func (pr *Progress) UpdateCommit(index uint64) { - if index > pr.Commit { - pr.Commit = index +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { + if pr.State != StateReplicate { + return } + pr.Next += uint64(entries) + pr.Inflights.Add(pr.Next-1, bytes) } // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. func (pr *Progress) MaybeUpdate(n uint64) bool { - var updated bool - if pr.Match < n { - pr.Match = n - updated = true - pr.MsgAppFlowPaused = false - } - pr.Next = max(pr.Next, n+1) - return updated + return pr.Watermark.Update(n) } -// OptimisticUpdate signals that appends all the way up to and including index n -// are in-flight. As a result, Next is increased to n+1. -func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 } - // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The // arguments are the index of the append message rejected by the follower, and // the hint that we want to decrease to. @@ -242,7 +195,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { } pr.Next = max(min(rejected, matchHint+1), 1) - pr.MsgAppFlowPaused = false + pr.Pause = false return true } @@ -255,9 +208,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { func (pr *Progress) IsPaused() bool { switch pr.State { case StateProbe: - return pr.MsgAppFlowPaused + return pr.Pause case StateReplicate: - return pr.MsgAppFlowPaused + return pr.Pause && pr.Commit.Pause case StateSnapshot: return true default: @@ -265,9 +218,23 @@ func (pr *Progress) IsPaused() bool { } } +type ProgressReplicate Progress + +func (pr *ProgressReplicate) IsThrottled(lastIndex uint64) bool { + return pr.Next > lastIndex || pr.Inflights.Full() +} + +func (pr *ProgressReplicate) ShouldSendCommit(committed uint64) bool { + return committed >= pr.Commit.Next +} + +func (pr *ProgressReplicate) UpToDate(lastIndex, committed uint64) bool { + return pr.Match >= lastIndex && pr.Commit.Match >= committed +} + func (pr *Progress) String() string { var buf strings.Builder - fmt.Fprintf(&buf, "%s match=%d commit=%d next=%d", pr.State, pr.Match, pr.Commit, pr.Next) + fmt.Fprintf(&buf, "%s match=%d commit=%d next=%d", pr.State, pr.Match, pr.Commit.Match, pr.Next) if pr.IsLearner { fmt.Fprint(&buf, " learner") } diff --git a/tracker/progress_test.go b/tracker/progress_test.go index 702aef0d..c5bb0fec 100644 --- a/tracker/progress_test.go +++ b/tracker/progress_test.go @@ -24,15 +24,15 @@ func TestProgressString(t *testing.T) { ins := NewInflights(1, 0) ins.Add(123, 1) pr := &Progress{ - Match: 2, - Commit: 1, - Next: 3, - State: StateSnapshot, - PendingSnapshot: 123, - RecentActive: false, - MsgAppFlowPaused: true, - IsLearner: true, - Inflights: ins, + Match: 2, + Commit: 1, + Next: 3, + State: StateSnapshot, + PendingSnapshot: 123, + RecentActive: false, + Pause: true, + IsLearner: true, + Inflights: ins, } const exp = `StateSnapshot match=2 commit=1 next=3 learner paused pendingSnap=123 inactive inflight=1[full]` assert.Equal(t, exp, pr.String()) @@ -54,26 +54,26 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, - MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256, 0), + State: tt.state, + Pause: tt.paused, + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } } // TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset -// MsgAppFlowPaused. +// Pause. func TestProgressResume(t *testing.T) { p := &Progress{ - Next: 2, - MsgAppFlowPaused: true, + Next: 2, + Pause: true, } p.MaybeDecrTo(1, 1) - assert.False(t, p.MsgAppFlowPaused) - p.MsgAppFlowPaused = true + assert.False(t, p.Pause) + p.Pause = true p.MaybeUpdate(2) - assert.False(t, p.MsgAppFlowPaused) + assert.False(t, p.Pause) } func TestProgressBecomeProbe(t *testing.T) { diff --git a/tracker/watermark.go b/tracker/watermark.go new file mode 100644 index 00000000..fecb2478 --- /dev/null +++ b/tracker/watermark.go @@ -0,0 +1,38 @@ +package tracker + +// Watermark is blah. Algorithm: +// +// - Send a bunch of stuff, Next will be updated. +// - Receive a bunch of stuff, Match will be updated. +// - Send < Next, Pause will be enabled. +// - Receive an update that bumps Match, Pause is disabled. +// +// Keep writing. +type Watermark struct { + // Match is the watermark up to which the follower matches the leader. + Match uint64 + // Next is the next leader watermark to send. All marks < Next are already + // in-flight to the follower. + Next uint64 + // Pause is true if there was a recent attempt to bump the follower's + // watermark to Next-1. + Pause bool +} + +func (w *Watermark) Update(match uint64) bool { + if match < w.Match { + return false + } + w.Match = match + w.Next = max(w.Next, match+1) + w.Pause = false + return true +} + +func (w *Watermark) Sent(watermark uint64) { + if watermark >= w.Next { + w.Next = watermark + 1 + } else { + // w.Pause = true + } +}