Skip to content

Commit

Permalink
Merge pull request #145 from pav-kv/add-entry-append-types
Browse files Browse the repository at this point in the history
raft: add entryID and logSlice types
  • Loading branch information
ahrtr authored Feb 5, 2024
2 parents 94b9bbe + bd5b421 commit 23c936a
Show file tree
Hide file tree
Showing 10 changed files with 354 additions and 99 deletions.
68 changes: 38 additions & 30 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,25 @@ func (l *raftLog) String() string {

// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if !l.matchTerm(index, logTerm) {
func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) {
if !l.matchTerm(a.prev) {
return 0, false
}
// TODO(pav-kv): propagate logSlice down the stack. It will be used all the
// way down in unstable, for safety checks, and for useful bookkeeping.

lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
lastnewi = a.prev.index + uint64(len(a.entries))
ci := l.findConflict(a.entries)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
offset := a.prev.index + 1
if ci-offset > uint64(len(a.entries)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
}
l.append(ents[ci-offset:]...)
l.append(a.entries[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
Expand Down Expand Up @@ -150,13 +152,15 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 {
// a different term.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for _, ne := range ents {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
for i := range ents {
if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
if id.index <= l.lastIndex() {
// TODO(pav-kv): can simply print %+v of the id. This will change the
// log format though.
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term)
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
}
return ne.Index
return id.index
}
}
return 0
Expand Down Expand Up @@ -360,7 +364,7 @@ func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable
i < l.maxAppliableIndex(allowUnstable)
}

func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) }

func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }

Expand All @@ -370,12 +374,14 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
// to Ready().
func (l *raftLog) acceptUnstable() { l.unstable.acceptInProgress() }

func (l *raftLog) lastTerm() uint64 {
t, err := l.term(l.lastIndex())
// lastEntryID returns the ID of the last entry in the log.
func (l *raftLog) lastEntryID() entryID {
index := l.lastIndex()
t, err := l.term(index)
if err != nil {
l.logger.Panicf("unexpected error when getting the last term (%v)", err)
l.logger.Panicf("unexpected error when getting the last term at %d: %v", index, err)
}
return t
return entryID{term: t, index: index}
}

func (l *raftLog) term(i uint64) (uint64, error) {
Expand Down Expand Up @@ -426,30 +432,32 @@ func (l *raftLog) allEntries() []pb.Entry {
panic(err)
}

// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
// isUpToDate determines if a log with the given last entry is more up-to-date
// by comparing the index and term of the last entries in the existing logs.
//
// If the logs have last entries with different terms, then the log with the
// later term is more up-to-date. If the logs end with the same term, then
// whichever log has the larger lastIndex is more up-to-date. If the logs are
// the same, the given log is up-to-date.
func (l *raftLog) isUpToDate(lasti, term uint64) bool {
return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
func (l *raftLog) isUpToDate(their entryID) bool {
our := l.lastEntryID()
return their.term > our.term || their.term == our.term && their.index >= our.index
}

func (l *raftLog) matchTerm(i, term uint64) bool {
t, err := l.term(i)
func (l *raftLog) matchTerm(id entryID) bool {
t, err := l.term(id.index)
if err != nil {
return false
}
return t == term
return t == id.term
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
// NB: term should never be 0 on a commit because the leader campaigns at
// least at term 1. But if it is 0 for some reason, we don't want to consider
// this a term match in case zeroTermOnOutOfBounds returns 0.
if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
func (l *raftLog) maybeCommit(at entryID) bool {
// NB: term should never be 0 on a commit because the leader campaigned at
// least at term 1. But if it is 0 for some reason, we don't consider this a
// term match.
if at.term != 0 && at.index > l.committed && l.matchTerm(at) {
l.commitTo(at.index)
return true
}
return false
Expand Down
87 changes: 53 additions & 34 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestIsUpToDate(t *testing.T) {

for i, tt := range tests {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, tt.wUpToDate, raftLog.isUpToDate(tt.lastIndex, tt.term))
require.Equal(t, tt.wUpToDate, raftLog.isUpToDate(entryID{term: tt.term, index: tt.lastIndex}))
})
}
}
Expand Down Expand Up @@ -208,9 +208,9 @@ func TestLogMaybeAppend(t *testing.T) {
lastterm := uint64(3)
commit := uint64(1)

// TODO(pav-kv): clean-up this test.
tests := []struct {
logTerm uint64
index uint64
prev entryID
committed uint64
ents []pb.Entry

Expand All @@ -221,71 +221,91 @@ func TestLogMaybeAppend(t *testing.T) {
}{
// not match: term is different
{
lastterm - 1, lastindex, lastindex, []pb.Entry{{Index: lastindex + 1, Term: 4}},
entryID{term: lastterm - 1, index: lastindex}, lastindex,
[]pb.Entry{{Index: lastindex + 1, Term: 4}},
0, false, commit, false,
},
// not match: index out of bound
{
lastterm, lastindex + 1, lastindex, []pb.Entry{{Index: lastindex + 2, Term: 4}},
entryID{term: lastterm, index: lastindex + 1}, lastindex,
[]pb.Entry{{Index: lastindex + 2, Term: 4}},
0, false, commit, false,
},
// match with the last existing entry
{
lastterm, lastindex, lastindex, nil,
entryID{term: lastterm, index: lastindex}, lastindex, nil,
lastindex, true, lastindex, false,
},
{
lastterm, lastindex, lastindex + 1, nil,
entryID{term: lastterm, index: lastindex}, lastindex + 1, nil,
lastindex, true, lastindex, false, // do not increase commit higher than lastnewi
},
{
lastterm, lastindex, lastindex - 1, nil,
entryID{term: lastterm, index: lastindex}, lastindex - 1, nil,
lastindex, true, lastindex - 1, false, // commit up to the commit in the message
},
{
lastterm, lastindex, 0, nil,
entryID{term: lastterm, index: lastindex}, 0, nil,
lastindex, true, commit, false, // commit do not decrease
},
{
0, 0, lastindex, nil,
entryID{}, lastindex, nil,
0, true, commit, false, // commit do not decrease
},
{
lastterm, lastindex, lastindex, []pb.Entry{{Index: lastindex + 1, Term: 4}},
entryID{term: lastterm, index: lastindex}, lastindex,
[]pb.Entry{{Index: lastindex + 1, Term: 4}},
lastindex + 1, true, lastindex, false,
},
{
lastterm, lastindex, lastindex + 1, []pb.Entry{{Index: lastindex + 1, Term: 4}},
entryID{term: lastterm, index: lastindex}, lastindex + 1,
[]pb.Entry{{Index: lastindex + 1, Term: 4}},
lastindex + 1, true, lastindex + 1, false,
},
{
lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}},
entryID{term: lastterm, index: lastindex}, lastindex + 2,
[]pb.Entry{{Index: lastindex + 1, Term: 4}},
lastindex + 1, true, lastindex + 1, false, // do not increase commit higher than lastnewi
},
{
lastterm, lastindex, lastindex + 2, []pb.Entry{{Index: lastindex + 1, Term: 4}, {Index: lastindex + 2, Term: 4}},
entryID{term: lastterm, index: lastindex}, lastindex + 2,
[]pb.Entry{{Index: lastindex + 1, Term: 4}, {Index: lastindex + 2, Term: 4}},
lastindex + 2, true, lastindex + 2, false,
},
// match with the entry in the middle
{
lastterm - 1, lastindex - 1, lastindex, []pb.Entry{{Index: lastindex, Term: 4}},
entryID{term: lastterm - 1, index: lastindex - 1}, lastindex,
[]pb.Entry{{Index: lastindex, Term: 4}},
lastindex, true, lastindex, false,
},
{
lastterm - 2, lastindex - 2, lastindex, []pb.Entry{{Index: lastindex - 1, Term: 4}},
entryID{term: lastterm - 2, index: lastindex - 2}, lastindex,
[]pb.Entry{{Index: lastindex - 1, Term: 4}},
lastindex - 1, true, lastindex - 1, false,
},
{
lastterm - 3, lastindex - 3, lastindex, []pb.Entry{{Index: lastindex - 2, Term: 4}},
entryID{term: lastterm - 3, index: lastindex - 3}, lastindex,
[]pb.Entry{{Index: lastindex - 2, Term: 4}},
lastindex - 2, true, lastindex - 2, true, // conflict with existing committed entry
},
{
lastterm - 2, lastindex - 2, lastindex, []pb.Entry{{Index: lastindex - 1, Term: 4}, {Index: lastindex, Term: 4}},
entryID{term: lastterm - 2, index: lastindex - 2}, lastindex,
[]pb.Entry{{Index: lastindex - 1, Term: 4}, {Index: lastindex, Term: 4}},
lastindex, true, lastindex, false,
},
}

for i, tt := range tests {
// TODO(pav-kv): for now, we pick a high enough app.term so that it
// represents a valid append message. The maybeAppend currently ignores it,
// but it must check that the append does not regress the term.
app := logSlice{
term: 100,
prev: tt.prev,
entries: tt.ents,
}
require.NoError(t, app.valid())

raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.committed = commit
Expand All @@ -296,7 +316,7 @@ func TestLogMaybeAppend(t *testing.T) {
require.True(t, tt.wpanic)
}
}()
glasti, gappend := raftLog.maybeAppend(tt.index, tt.logTerm, tt.committed, tt.ents...)
glasti, gappend := raftLog.maybeAppend(app, tt.committed)
require.Equal(t, tt.wlasti, glasti)
require.Equal(t, tt.wappend, gappend)
require.Equal(t, tt.wcommit, raftLog.committed)
Expand All @@ -316,7 +336,6 @@ func TestCompactionSideEffects(t *testing.T) {
// Populate the log with 1000 entries; 750 in stable storage and 250 in unstable.
lastIndex := uint64(1000)
unstableIndex := uint64(750)
lastTerm := lastIndex
storage := NewMemoryStorage()
for i = 1; i <= unstableIndex; i++ {
storage.Append([]pb.Entry{{Term: i, Index: i}})
Expand All @@ -326,7 +345,7 @@ func TestCompactionSideEffects(t *testing.T) {
raftLog.append(pb.Entry{Term: i + 1, Index: i + 1})
}

require.True(t, raftLog.maybeCommit(lastIndex, lastTerm))
require.True(t, raftLog.maybeCommit(raftLog.lastEntryID()))
raftLog.appliedTo(raftLog.committed, 0 /* size */)

offset := uint64(500)
Expand All @@ -338,7 +357,7 @@ func TestCompactionSideEffects(t *testing.T) {
}

for j := offset; j <= raftLog.lastIndex(); j++ {
require.True(t, raftLog.matchTerm(j, j))
require.True(t, raftLog.matchTerm(entryID{term: j, index: j}))
}

unstableEnts := raftLog.nextUnstableEnts()
Expand Down Expand Up @@ -397,8 +416,8 @@ func TestHasNextCommittedEnts(t *testing.T) {

raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -455,8 +474,8 @@ func TestNextCommittedEnts(t *testing.T) {

raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -514,8 +533,8 @@ func TestAcceptApplying(t *testing.T) {

raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(3, 0 /* size */)

raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable)
Expand Down Expand Up @@ -563,8 +582,8 @@ func TestAppliedTo(t *testing.T) {

raftLog := newLogWithSize(storage, raftLogger, maxSize)
raftLog.append(ents...)
raftLog.stableTo(4, 1)
raftLog.maybeCommit(5, 1)
raftLog.stableTo(entryID{term: 1, index: 4})
raftLog.maybeCommit(entryID{term: 1, index: 5})
raftLog.appliedTo(3, 0 /* size */)
raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */)

Expand Down Expand Up @@ -601,7 +620,7 @@ func TestNextUnstableEnts(t *testing.T) {

ents := raftLog.nextUnstableEnts()
if l := len(ents); l > 0 {
raftLog.stableTo(ents[l-1].Index, ents[l-1].Term)
raftLog.stableTo(pbEntryID(&ents[l-1]))
}
require.Equal(t, tt.wents, ents)
require.Equal(t, previousEnts[len(previousEnts)-1].Index+1, raftLog.unstable.offset)
Expand Down Expand Up @@ -652,7 +671,7 @@ func TestStableTo(t *testing.T) {
t.Run(fmt.Sprint(i), func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.stableTo(tt.stablei, tt.stablet)
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
})
}
Expand Down Expand Up @@ -689,7 +708,7 @@ func TestStableToWithSnap(t *testing.T) {
require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}))
raftLog := newLog(s, raftLogger)
raftLog.append(tt.newEnts...)
raftLog.stableTo(tt.stablei, tt.stablet)
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
})

Expand Down Expand Up @@ -723,7 +742,7 @@ func TestCompaction(t *testing.T) {
storage.Append([]pb.Entry{{Index: i}})
}
raftLog := newLog(storage, raftLogger)
raftLog.maybeCommit(tt.lastIndex, 0)
raftLog.maybeCommit(entryID{term: 0, index: tt.lastIndex}) // TODO(pav-kv): this is a no-op

raftLog.appliedTo(raftLog.committed, 0 /* size */)
for j := 0; j < len(tt.compact); j++ {
Expand Down
Loading

0 comments on commit 23c936a

Please sign in to comment.