diff --git a/log.go b/log.go index 84826c3a..9097491c 100644 --- a/log.go +++ b/log.go @@ -106,23 +106,25 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). -func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { - if !l.matchTerm(index, logTerm) { +func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) { + if !l.matchTerm(a.prev) { return 0, false } + // TODO(pav-kv): propagate logSlice down the stack. It will be used all the + // way down in unstable, for safety checks, and for useful bookkeeping. - lastnewi = index + uint64(len(ents)) - ci := l.findConflict(ents) + lastnewi = a.prev.index + uint64(len(a.entries)) + ci := l.findConflict(a.entries) switch { case ci == 0: case ci <= l.committed: l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: - offset := index + 1 - if ci-offset > uint64(len(ents)) { - l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) + offset := a.prev.index + 1 + if ci-offset > uint64(len(a.entries)) { + l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries)) } - l.append(ents[ci-offset:]...) + l.append(a.entries[ci-offset:]...) } l.commitTo(min(committed, lastnewi)) return lastnewi, true @@ -150,13 +152,15 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 { // a different term. // The index of the given entries MUST be continuously increasing. func (l *raftLog) findConflict(ents []pb.Entry) uint64 { - for _, ne := range ents { - if !l.matchTerm(ne.Index, ne.Term) { - if ne.Index <= l.lastIndex() { + for i := range ents { + if id := pbEntryID(&ents[i]); !l.matchTerm(id) { + if id.index <= l.lastIndex() { + // TODO(pav-kv): can simply print %+v of the id. This will change the + // log format though. l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term) + id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term) } - return ne.Index + return id.index } } return 0 @@ -360,7 +364,7 @@ func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable i < l.maxAppliableIndex(allowUnstable) } -func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } +func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) } func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } @@ -370,12 +374,14 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } // to Ready(). func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() } -func (l *raftLog) lastTerm() uint64 { - t, err := l.term(l.lastIndex()) +// lastEntryID returns the ID of the last entry in the log. +func (l *raftLog) lastEntryID() entryID { + index := l.lastIndex() + t, err := l.term(index) if err != nil { - l.logger.Panicf("unexpected error when getting the last term (%v)", err) + l.logger.Panicf("unexpected error when getting the last term at %d: %v", index, err) } - return t + return entryID{term: t, index: index} } func (l *raftLog) term(i uint64) (uint64, error) { @@ -426,30 +432,32 @@ func (l *raftLog) allEntries() []pb.Entry { panic(err) } -// isUpToDate determines if the given (lastIndex,term) log is more up-to-date +// isUpToDate determines if a log with the given last entry is more up-to-date // by comparing the index and term of the last entries in the existing logs. +// // If the logs have last entries with different terms, then the log with the // later term is more up-to-date. If the logs end with the same term, then // whichever log has the larger lastIndex is more up-to-date. If the logs are // the same, the given log is up-to-date. -func (l *raftLog) isUpToDate(lasti, term uint64) bool { - return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex()) +func (l *raftLog) isUpToDate(their entryID) bool { + our := l.lastEntryID() + return their.term > our.term || their.term == our.term && their.index >= our.index } -func (l *raftLog) matchTerm(i, term uint64) bool { - t, err := l.term(i) +func (l *raftLog) matchTerm(id entryID) bool { + t, err := l.term(id.index) if err != nil { return false } - return t == term + return t == id.term } -func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - // NB: term should never be 0 on a commit because the leader campaigns at - // least at term 1. But if it is 0 for some reason, we don't want to consider - // this a term match in case zeroTermOnOutOfBounds returns 0. - if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { - l.commitTo(maxIndex) +func (l *raftLog) maybeCommit(at entryID) bool { + // NB: term should never be 0 on a commit because the leader campaigned at + // least at term 1. But if it is 0 for some reason, we don't consider this a + // term match. + if at.term != 0 && at.index > l.committed && l.matchTerm(at) { + l.commitTo(at.index) return true } return false diff --git a/log_test.go b/log_test.go index 2711ff9c..c53f4bf3 100644 --- a/log_test.go +++ b/log_test.go @@ -137,7 +137,7 @@ func TestIsUpToDate(t *testing.T) { for i, tt := range tests { t.Run(fmt.Sprint(i), func(t *testing.T) { - require.Equal(t, tt.wUpToDate, raftLog.isUpToDate(tt.lastIndex, tt.term)) + require.Equal(t, tt.wUpToDate, raftLog.isUpToDate(entryID{term: tt.term, index: tt.lastIndex})) }) } } @@ -208,9 +208,9 @@ func TestLogMaybeAppend(t *testing.T) { lastterm := uint64(3) commit := uint64(1) + // TODO(pav-kv): clean-up this test. tests := []struct { - logTerm uint64 - index uint64 + prev entryID committed uint64 ents []pb.Entry @@ -221,71 +221,91 @@ func TestLogMaybeAppend(t *testing.T) { }{ // not match: term is different { - lastterm - 1, lastindex, lastindex, []pb.Entry{{Index: lastindex + 1, Term: 4}}, + entryID{term: lastterm - 1, index: lastindex}, lastindex, + []pb.Entry{{Index: lastindex + 1, Term: 4}}, 0, false, commit, false, }, // not match: index out of bound { - lastterm, lastindex + 1, lastindex, []pb.Entry{{Index: lastindex + 2, Term: 4}}, + entryID{term: lastterm, index: lastindex + 1}, lastindex, + []pb.Entry{{Index: lastindex + 2, Term: 4}}, 0, false, commit, false, }, // match with the last existing entry { - lastterm, lastindex, lastindex, nil, + entryID{term: lastterm, index: lastindex}, lastindex, nil, lastindex, true, lastindex, false, }, { - lastterm, lastindex, lastindex + 1, nil, + entryID{term: lastterm, index: lastindex}, lastindex + 1, nil, lastindex, true, lastindex, false, // do not increase commit higher than lastnewi }, { - lastterm, lastindex, lastindex - 1, nil, + entryID{term: lastterm, index: lastindex}, lastindex - 1, nil, lastindex, true, lastindex - 1, false, // commit up to the commit in the message }, { - lastterm, lastindex, 0, nil, + entryID{term: lastterm, index: lastindex}, 0, nil, lastindex, true, commit, false, // commit do not decrease }, { - 0, 0, lastindex, nil, + entryID{}, lastindex, nil, 0, true, commit, false, // commit do not decrease }, { - lastterm, lastindex, lastindex, []pb.Entry{{Index: lastindex + 1, Term: 4}}, + entryID{term: lastterm, index: lastindex}, lastindex, + []pb.Entry{{Index: lastindex + 1, Term: 4}}, lastindex + 1, true, lastindex, false, }, { - lastterm, lastindex, lastindex + 1, []pb.Entry{{Index: lastindex + 1, Term: 4}}, + entryID{term: lastterm, index: lastindex}, lastindex + 1, + []pb.Entry{{Index: lastindex + 1, Term: 4}}, lastindex + 1, true, lastindex + 1, false, }, { - lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}}, + entryID{term: lastterm, index: lastindex}, lastindex + 2, + []pb.Entry{{Index: lastindex + 1, Term: 4}}, lastindex + 1, true, lastindex + 1, false, // do not increase commit higher than lastnewi }, { - lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}, {Index: lastindex + 2, Term: 4}}, + entryID{term: lastterm, index: lastindex}, lastindex + 2, + []pb.Entry{{Index: lastindex + 1, Term: 4}, {Index: lastindex + 2, Term: 4}}, lastindex + 2, true, lastindex + 2, false, }, // match with the entry in the middle { - lastterm - 1, lastindex - 1, lastindex, []pb.Entry{{Index: lastindex, Term: 4}}, + entryID{term: lastterm - 1, index: lastindex - 1}, lastindex, + []pb.Entry{{Index: lastindex, Term: 4}}, lastindex, true, lastindex, false, }, { - lastterm - 2, lastindex - 2, lastindex, []pb.Entry{{Index: lastindex - 1, Term: 4}}, + entryID{term: lastterm - 2, index: lastindex - 2}, lastindex, + []pb.Entry{{Index: lastindex - 1, Term: 4}}, lastindex - 1, true, lastindex - 1, false, }, { - lastterm - 3, lastindex - 3, lastindex, []pb.Entry{{Index: lastindex - 2, Term: 4}}, + entryID{term: lastterm - 3, index: lastindex - 3}, lastindex, + []pb.Entry{{Index: lastindex - 2, Term: 4}}, lastindex - 2, true, lastindex - 2, true, // conflict with existing committed entry }, { - lastterm - 2, lastindex - 2, lastindex, []pb.Entry{{Index: lastindex - 1, Term: 4}, {Index: lastindex, Term: 4}}, + entryID{term: lastterm - 2, index: lastindex - 2}, lastindex, + []pb.Entry{{Index: lastindex - 1, Term: 4}, {Index: lastindex, Term: 4}}, lastindex, true, lastindex, false, }, } for i, tt := range tests { + // TODO(pav-kv): for now, we pick a high enough app.term so that it + // represents a valid append message. The maybeAppend currently ignores it, + // but it must check that the append does not regress the term. + app := logSlice{ + term: 100, + prev: tt.prev, + entries: tt.ents, + } + require.NoError(t, app.valid()) + raftLog := newLog(NewMemoryStorage(), raftLogger) raftLog.append(previousEnts...) raftLog.committed = commit @@ -296,7 +316,7 @@ func TestLogMaybeAppend(t *testing.T) { require.True(t, tt.wpanic) } }() - glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents...) + glasti, gappend := raftLog.maybeAppend(app, tt.committed) require.Equal(t, tt.wlasti, glasti) require.Equal(t, tt.wappend, gappend) require.Equal(t, tt.wcommit, raftLog.committed) @@ -316,7 +336,6 @@ func TestCompactionSideEffects(t *testing.T) { // Populate the log with 1000 entries; 750 in stable storage and 250 in unstable. lastIndex := uint64(1000) unstableIndex := uint64(750) - lastTerm := lastIndex storage := NewMemoryStorage() for i = 1; i <= unstableIndex; i++ { storage.Append([]pb.Entry{{Term: i, Index: i}}) @@ -326,7 +345,7 @@ func TestCompactionSideEffects(t *testing.T) { raftLog.append(pb.Entry{Term: i + 1, Index: i + 1}) } - require.True(t, raftLog.maybeCommit(lastIndex, lastTerm)) + require.True(t, raftLog.maybeCommit(raftLog.lastEntryID())) raftLog.appliedTo(raftLog.committed, 0 /* size */) offset := uint64(500) @@ -338,7 +357,7 @@ func TestCompactionSideEffects(t *testing.T) { } for j := offset; j <= raftLog.lastIndex(); j++ { - require.True(t, raftLog.matchTerm(j, j)) + require.True(t, raftLog.matchTerm(entryID{term: j, index: j})) } unstableEnts := raftLog.nextUnstableEnts() @@ -397,8 +416,8 @@ func TestHasNextCommittedEnts(t *testing.T) { raftLog := newLog(storage, raftLogger) raftLog.append(ents...) - raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.stableTo(entryID{term: 1, index: 4}) + raftLog.maybeCommit(entryID{term: 1, index: 5}) raftLog.appliedTo(tt.applied, 0 /* size */) raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable) raftLog.applyingEntsPaused = tt.paused @@ -455,8 +474,8 @@ func TestNextCommittedEnts(t *testing.T) { raftLog := newLog(storage, raftLogger) raftLog.append(ents...) - raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.stableTo(entryID{term: 1, index: 4}) + raftLog.maybeCommit(entryID{term: 1, index: 5}) raftLog.appliedTo(tt.applied, 0 /* size */) raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable) raftLog.applyingEntsPaused = tt.paused @@ -514,8 +533,8 @@ func TestAcceptApplying(t *testing.T) { raftLog := newLogWithSize(storage, raftLogger, maxSize) raftLog.append(ents...) - raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.stableTo(entryID{term: 1, index: 4}) + raftLog.maybeCommit(entryID{term: 1, index: 5}) raftLog.appliedTo(3, 0 /* size */) raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable) @@ -563,8 +582,8 @@ func TestAppliedTo(t *testing.T) { raftLog := newLogWithSize(storage, raftLogger, maxSize) raftLog.append(ents...) - raftLog.stableTo(4, 1) - raftLog.maybeCommit(5, 1) + raftLog.stableTo(entryID{term: 1, index: 4}) + raftLog.maybeCommit(entryID{term: 1, index: 5}) raftLog.appliedTo(3, 0 /* size */) raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */) @@ -601,7 +620,7 @@ func TestNextUnstableEnts(t *testing.T) { ents := raftLog.nextUnstableEnts() if l := len(ents); l > 0 { - raftLog.stableTo(ents[l-1].Index, ents[l-1].Term) + raftLog.stableTo(pbEntryID(&ents[l-1])) } require.Equal(t, tt.wents, ents) require.Equal(t, previousEnts[len(previousEnts)-1].Index+1, raftLog.unstable.offset) @@ -652,7 +671,7 @@ func TestStableTo(t *testing.T) { t.Run(fmt.Sprint(i), func(t *testing.T) { raftLog := newLog(NewMemoryStorage(), raftLogger) raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) - raftLog.stableTo(tt.stablei, tt.stablet) + raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei}) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) } @@ -689,7 +708,7 @@ func TestStableToWithSnap(t *testing.T) { require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})) raftLog := newLog(s, raftLogger) raftLog.append(tt.newEnts...) - raftLog.stableTo(tt.stablei, tt.stablet) + raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei}) require.Equal(t, tt.wunstable, raftLog.unstable.offset) }) @@ -723,7 +742,7 @@ func TestCompaction(t *testing.T) { storage.Append([]pb.Entry{{Index: i}}) } raftLog := newLog(storage, raftLogger) - raftLog.maybeCommit(tt.lastIndex, 0) + raftLog.maybeCommit(entryID{term: 0, index: tt.lastIndex}) // TODO(pav-kv): this is a no-op raftLog.appliedTo(raftLog.committed, 0 /* size */) for j := 0; j < len(tt.compact); j++ { diff --git a/log_unstable.go b/log_unstable.go index 16cbdeff..2629aae8 100644 --- a/log_unstable.go +++ b/log_unstable.go @@ -131,30 +131,30 @@ func (u *unstable) acceptInProgress() { // The method should only be called when the caller can attest that the entries // can not be overwritten by an in-progress log append. See the related comment // in newStorageAppendRespMsg. -func (u *unstable) stableTo(i, t uint64) { - gt, ok := u.maybeTerm(i) +func (u *unstable) stableTo(id entryID) { + gt, ok := u.maybeTerm(id.index) if !ok { // Unstable entry missing. Ignore. - u.logger.Infof("entry at index %d missing from unstable log; ignoring", i) + u.logger.Infof("entry at index %d missing from unstable log; ignoring", id.index) return } - if i < u.offset { + if id.index < u.offset { // Index matched unstable snapshot, not unstable entry. Ignore. - u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", i) + u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", id.index) return } - if gt != t { + if gt != id.term { // Term mismatch between unstable entry and specified entry. Ignore. // This is possible if part or all of the unstable log was replaced // between that time that a set of entries started to be written to // stable storage and when they finished. u.logger.Infof("entry at (index,term)=(%d,%d) mismatched with "+ - "entry at (%d,%d) in unstable log; ignoring", i, t, i, gt) + "entry at (%d,%d) in unstable log; ignoring", id.index, id.term, id.index, gt) return } - num := int(i + 1 - u.offset) + num := int(id.index + 1 - u.offset) u.entries = u.entries[num:] - u.offset = i + 1 + u.offset = id.index + 1 u.offsetInProgress = max(u.offsetInProgress, u.offset) u.shrinkEntriesArray() } diff --git a/log_unstable_test.go b/log_unstable_test.go index a0b4f8e3..39c3f1ac 100644 --- a/log_unstable_test.go +++ b/log_unstable_test.go @@ -493,7 +493,7 @@ func TestUnstableStableTo(t *testing.T) { snapshot: tt.snap, logger: raftLogger, } - u.stableTo(tt.index, tt.term) + u.stableTo(entryID{term: tt.term, index: tt.index}) require.Equal(t, tt.woffset, u.offset) require.Equal(t, tt.woffsetInProgress, u.offsetInProgress) require.Equal(t, tt.wlen, len(u.entries)) diff --git a/raft.go b/raft.go index 85b066ec..3357ae45 100644 --- a/raft.go +++ b/raft.go @@ -458,9 +458,10 @@ func newRaft(c *Config) *raft { stepDownOnRemoval: c.StepDownOnRemoval, } + lastID := r.raftLog.lastEntryID() cfg, trk, err := confchange.Restore(confchange.Changer{ Tracker: r.trk, - LastIndex: raftlog.lastIndex(), + LastIndex: lastID.index, }, cs) if err != nil { panic(err) @@ -480,8 +481,9 @@ func newRaft(c *Config) *raft { nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) } + // TODO(pav-kv): it should be ok to simply print %+v for lastID. r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", - r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) + r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, lastID.index, lastID.term) return r } @@ -755,12 +757,11 @@ func (r *raft) appliedSnap(snap *pb.Snapshot) { r.appliedTo(index, 0 /* size */) } -// maybeCommit attempts to advance the commit index. Returns true if -// the commit index changed (in which case the caller should call -// r.bcastAppend). +// maybeCommit attempts to advance the commit index. Returns true if the commit +// index changed (in which case the caller should call r.bcastAppend). This can +// only be called in StateLeader. func (r *raft) maybeCommit() bool { - mci := r.trk.Committed() - return r.raftLog.maybeCommit(mci, r.Term) + return r.raftLog.maybeCommit(entryID{term: r.Term, index: r.trk.Committed()}) } func (r *raft) reset(term uint64) { @@ -1033,14 +1034,16 @@ func (r *raft) campaign(t CampaignType) { r.send(pb.Message{To: id, Term: term, Type: voteRespMsgType(voteMsg)}) continue } + // TODO(pav-kv): it should be ok to simply print %+v for the lastEntryID. + last := r.raftLog.lastEntryID() r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d", - r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term) + r.id, last.term, last.index, voteMsg, id, r.Term) var ctx []byte if t == campaignTransfer { ctx = []byte(t) } - r.send(pb.Message{To: id, Term: term, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) + r.send(pb.Message{To: id, Term: term, Type: voteMsg, Index: last.index, LogTerm: last.term, Context: ctx}) } } @@ -1066,8 +1069,10 @@ func (r *raft) Step(m pb.Message) error { if !force && inLease { // If a server receives a RequestVote request within the minimum election timeout // of hearing from a current leader, it does not update its term or grant its vote + last := r.raftLog.lastEntryID() + // TODO(pav-kv): it should be ok to simply print the %+v of the lastEntryID. r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)", - r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) + r.id, last.term, last.index, r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) return nil } } @@ -1118,8 +1123,10 @@ func (r *raft) Step(m pb.Message) error { // Before Pre-Vote enable, there may have candidate with higher term, // but less log. After update to Pre-Vote, the cluster may deadlock if // we drop messages with a lower term. + last := r.raftLog.lastEntryID() + // TODO(pav-kv): it should be ok to simply print %+v of the lastEntryID. r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", - r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + r.id, last.term, last.index, r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}) } else if m.Type == pb.MsgStorageAppendResp { if m.Index != 0 { @@ -1154,7 +1161,7 @@ func (r *raft) Step(m pb.Message) error { case pb.MsgStorageAppendResp: if m.Index != 0 { - r.raftLog.stableTo(m.Index, m.LogTerm) + r.raftLog.stableTo(entryID{term: m.LogTerm, index: m.Index}) } if m.Snapshot != nil { r.appliedSnap(m.Snapshot) @@ -1175,7 +1182,9 @@ func (r *raft) Step(m pb.Message) error { // ...or this is a PreVote for a future term... (m.Type == pb.MsgPreVote && m.Term > r.Term) // ...and we believe the candidate is up to date. - if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) { + lastID := r.raftLog.lastEntryID() + candLastID := entryID{term: m.LogTerm, index: m.Index} + if canVote && r.raftLog.isUpToDate(candLastID) { // Note: it turns out that that learners must be allowed to cast votes. // This seems counter- intuitive but is necessary in the situation in which // a learner has been promoted (i.e. is now a voter) but has not learned @@ -1195,7 +1204,7 @@ func (r *raft) Step(m pb.Message) error { // in: // https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263. r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d", - r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + r.id, lastID.term, lastID.index, r.Vote, m.Type, m.From, candLastID.term, candLastID.index, r.Term) // When responding to Msg{Pre,}Vote messages we include the term // from the message, not the local term. To see why, consider the // case where a single node was previously partitioned away and @@ -1213,7 +1222,7 @@ func (r *raft) Step(m pb.Message) error { } } else { r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", - r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + r.id, lastID.term, lastID.index, r.Vote, m.Type, m.From, candLastID.term, candLastID.index, r.Term) r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) } @@ -1735,12 +1744,26 @@ func stepFollower(r *raft, m pb.Message) error { return nil } +// logSliceFromMsgApp extracts the appended logSlice from a MsgApp message. +func logSliceFromMsgApp(m *pb.Message) logSlice { + // TODO(pav-kv): consider also validating the logSlice here. + return logSlice{ + term: m.Term, + prev: entryID{term: m.LogTerm, index: m.Index}, + entries: m.Entries, + } +} + func (r *raft) handleAppendEntries(m pb.Message) { - if m.Index < r.raftLog.committed { + // TODO(pav-kv): construct logSlice up the stack next to receiving the + // message, and validate it before taking any action (e.g. bumping term). + a := logSliceFromMsgApp(&m) + + if a.prev.index < r.raftLog.committed { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } - if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + if mlastIndex, ok := r.raftLog.maybeAppend(a, m.Commit); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } @@ -1852,9 +1875,12 @@ func (r *raft) restore(s pb.Snapshot) bool { // Now go ahead and actually restore. - if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { + id := entryID{term: s.Metadata.Term, index: s.Metadata.Index} + if r.raftLog.matchTerm(id) { + // TODO(pav-kv): can print %+v of the id, but it will change the format. + last := r.raftLog.lastEntryID() r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", - r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) + r.id, r.raftLog.committed, last.index, last.term, id.index, id.term) r.raftLog.commitTo(s.Metadata.Index) return false } @@ -1879,8 +1905,9 @@ func (r *raft) restore(s pb.Snapshot) bool { pr := r.trk.Progress[r.id] pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded + last := r.raftLog.lastEntryID() r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]", - r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) + r.id, r.raftLog.committed, last.index, last.term, id.index, id.term) return true } diff --git a/raft_paper_test.go b/raft_paper_test.go index fcdd49f2..a49ea02c 100644 --- a/raft_paper_test.go +++ b/raft_paper_test.go @@ -923,7 +923,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) { r.readMessages() s.Append(r.raftLog.nextUnstableEnts()) r.raftLog.appliedTo(r.raftLog.committed, 0 /* size */) - r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) + r.raftLog.stableTo(r.raftLog.lastEntryID()) } func acceptAndReply(m pb.Message) pb.Message { diff --git a/raft_test.go b/raft_test.go index e9d9657b..3bbf00a0 100644 --- a/raft_test.go +++ b/raft_test.go @@ -33,7 +33,7 @@ import ( func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { // Append unstable entries. s.Append(r.raftLog.nextUnstableEnts()) - r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) + r.raftLog.stableTo(r.raftLog.lastEntryID()) // Run post-append steps. r.advanceMessagesAfterAppend() @@ -1590,7 +1590,7 @@ func testRecvMsgVote(t *testing.T, msgType pb.MessageType) { // what the recipient node does when receiving a message with a // different term number, so we simply initialize both term numbers to // be the same. - term := max(sm.raftLog.lastTerm(), tt.logTerm) + term := max(sm.raftLog.lastEntryID().term, tt.logTerm) sm.Term = term sm.Step(pb.Message{Type: msgType, Term: term, From: 2, Index: tt.index, LogTerm: tt.logTerm}) diff --git a/rawnode.go b/rawnode.go index 9ca3fa2f..428ef519 100644 --- a/rawnode.go +++ b/rawnode.go @@ -354,8 +354,9 @@ func newStorageAppendRespMsg(r *raft, rd Ready) pb.Message { // dropped from memory. // // [^1]: https://en.wikipedia.org/wiki/ABA_problem - m.Index = r.raftLog.lastIndex() - m.LogTerm = r.raftLog.lastTerm() + last := r.raftLog.lastEntryID() + m.Index = last.index + m.LogTerm = last.term } if !IsEmptySnap(rd.Snapshot) { snap := rd.Snapshot diff --git a/types.go b/types.go new file mode 100644 index 00000000..bb24d434 --- /dev/null +++ b/types.go @@ -0,0 +1,106 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "fmt" + + pb "go.etcd.io/raft/v3/raftpb" +) + +// entryID uniquely identifies a raft log entry. +// +// Every entry is associated with a leadership term which issued this entry and +// initially appended it to the log. There can only be one leader at any term, +// and a leader never issues two entries with the same index. +type entryID struct { + term uint64 + index uint64 +} + +// pbEntryID returns the ID of the given pb.Entry. +func pbEntryID(entry *pb.Entry) entryID { + return entryID{term: entry.Term, index: entry.Index} +} + +// logSlice describes a correct slice of a raft log. +// +// Every log slice is considered in a context of a specific leader term. This +// term does not necessarily match entryID.term of the entries, since a leader +// log contains both entries from its own term, and some earlier terms. +// +// Two slices with a matching logSlice.term are guaranteed to be consistent, +// i.e. they never contain two different entries at the same index. The reverse +// is not true: two slices with different logSlice.term may contain both +// matching and mismatching entries. Specifically, logs at two different leader +// terms share a common prefix, after which they *permanently* diverge. +// +// A well-formed logSlice conforms to raft safety properties. It provides the +// following guarantees: +// +// 1. entries[i].Index == prev.index + 1 + i, +// 2. prev.term <= entries[0].Term, +// 3. entries[i-1].Term <= entries[i].Term, +// 4. entries[len-1].Term <= term. +// +// Property (1) means the slice is contiguous. Properties (2) and (3) mean that +// the terms of the entries in a log never regress. Property (4) means that a +// leader log at a specific term never has entries from higher terms. +// +// Users of this struct can assume the invariants hold true. Exception is the +// "gateway" code that initially constructs logSlice, such as when its content +// is sourced from a message that was received via transport, or from Storage, +// or in a test code that manually hard-codes this struct. In these cases, the +// invariants should be validated using the valid() method. +type logSlice struct { + // term is the leader term containing the given entries in its log. + term uint64 + // prev is the ID of the entry immediately preceding the entries. + prev entryID + // entries contains the consecutive entries representing this slice. + entries []pb.Entry +} + +// lastIndex returns the index of the last entry in this log slice. Returns +// prev.index if there are no entries. +func (s logSlice) lastIndex() uint64 { + return s.prev.index + uint64(len(s.entries)) +} + +// lastEntryID returns the ID of the last entry in this log slice, or prev if +// there are no entries. +func (s logSlice) lastEntryID() entryID { + if ln := len(s.entries); ln != 0 { + return pbEntryID(&s.entries[ln-1]) + } + return s.prev +} + +// valid returns nil iff the logSlice is a well-formed log slice. See logSlice +// comment for details on what constitutes a valid raft log slice. +func (s logSlice) valid() error { + prev := s.prev + for i := range s.entries { + id := pbEntryID(&s.entries[i]) + if id.term < prev.term || id.index != prev.index+1 { + return fmt.Errorf("leader term %d: entries %+v and %+v not consistent", s.term, prev, id) + } + prev = id + } + if s.term < prev.term { + return fmt.Errorf("leader term %d: entry %+v has a newer term", s.term, prev) + } + return nil +} diff --git a/types_test.go b/types_test.go new file mode 100644 index 00000000..fd5d91f5 --- /dev/null +++ b/types_test.go @@ -0,0 +1,94 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "testing" + + "github.com/stretchr/testify/require" + + pb "go.etcd.io/raft/v3/raftpb" +) + +func TestEntryID(t *testing.T) { + // Some obvious checks first. + require.Equal(t, entryID{term: 5, index: 10}, entryID{term: 5, index: 10}) + require.NotEqual(t, entryID{term: 4, index: 10}, entryID{term: 5, index: 10}) + require.NotEqual(t, entryID{term: 5, index: 9}, entryID{term: 5, index: 10}) + + for _, tt := range []struct { + entry pb.Entry + want entryID + }{ + {entry: pb.Entry{}, want: entryID{term: 0, index: 0}}, + {entry: pb.Entry{Term: 1, Index: 2, Data: []byte("data")}, want: entryID{term: 1, index: 2}}, + {entry: pb.Entry{Term: 10, Index: 123}, want: entryID{term: 10, index: 123}}, + } { + require.Equal(t, tt.want, pbEntryID(&tt.entry)) + } +} + +func TestLogSlice(t *testing.T) { + id := func(index, term uint64) entryID { + return entryID{term: term, index: index} + } + e := func(index, term uint64) pb.Entry { + return pb.Entry{Term: term, Index: index} + } + for _, tt := range []struct { + term uint64 + prev entryID + entries []pb.Entry + + notOk bool + last entryID + }{ + // Empty "dummy" slice, starting at (0, 0) origin of the log. + {last: id(0, 0)}, + // Empty slice with a given prev ID. Valid only if term >= prev.term. + {prev: id(123, 10), notOk: true}, + {term: 9, prev: id(123, 10), notOk: true}, + {term: 10, prev: id(123, 10), last: id(123, 10)}, + {term: 11, prev: id(123, 10), last: id(123, 10)}, + // A single entry. + {term: 0, entries: []pb.Entry{e(1, 1)}, notOk: true}, + {term: 1, entries: []pb.Entry{e(1, 1)}, last: id(1, 1)}, + {term: 2, entries: []pb.Entry{e(1, 1)}, last: id(1, 1)}, + // Multiple entries. + {term: 2, entries: []pb.Entry{e(2, 1), e(3, 1), e(4, 2)}, notOk: true}, + {term: 1, prev: id(1, 1), entries: []pb.Entry{e(2, 1), e(3, 1), e(4, 2)}, notOk: true}, + {term: 2, prev: id(1, 1), entries: []pb.Entry{e(2, 1), e(3, 1), e(4, 2)}, last: id(4, 2)}, + // First entry inconsistent with prev. + {term: 10, prev: id(123, 5), entries: []pb.Entry{e(111, 5)}, notOk: true}, + {term: 10, prev: id(123, 5), entries: []pb.Entry{e(124, 4)}, notOk: true}, + {term: 10, prev: id(123, 5), entries: []pb.Entry{e(234, 6)}, notOk: true}, + {term: 10, prev: id(123, 5), entries: []pb.Entry{e(124, 6)}, last: id(124, 6)}, + // Inconsistent entries. + {term: 10, prev: id(12, 2), entries: []pb.Entry{e(13, 2), e(12, 2)}, notOk: true}, + {term: 10, prev: id(12, 2), entries: []pb.Entry{e(13, 2), e(15, 2)}, notOk: true}, + {term: 10, prev: id(12, 2), entries: []pb.Entry{e(13, 2), e(14, 1)}, notOk: true}, + {term: 10, prev: id(12, 2), entries: []pb.Entry{e(13, 2), e(14, 3)}, last: id(14, 3)}, + } { + t.Run("", func(t *testing.T) { + s := logSlice{term: tt.term, prev: tt.prev, entries: tt.entries} + require.Equal(t, tt.notOk, s.valid() != nil) + if !tt.notOk { + last := s.lastEntryID() + require.Equal(t, tt.last, last) + require.Equal(t, last.index, s.lastIndex()) + } + }) + } +}