Skip to content

Commit

Permalink
log: use logSlice in raftLog.maybeAppend
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
pav-kv committed Feb 5, 2024
1 parent 7cd824d commit bd5b421
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
18 changes: 10 additions & 8 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,25 @@ func (l *raftLog) String() string {

// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(prev entryID, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if !l.matchTerm(prev) {
func (l *raftLog) maybeAppend(a logSlice, committed uint64) (lastnewi uint64, ok bool) {
if !l.matchTerm(a.prev) {
return 0, false
}
// TODO(pav-kv): propagate logSlice down the stack. It will be used all the
// way down in unstable, for safety checks, and for useful bookkeeping.

lastnewi = prev.index + uint64(len(ents))
ci := l.findConflict(ents)
lastnewi = a.prev.index + uint64(len(a.entries))
ci := l.findConflict(a.entries)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := prev.index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
offset := a.prev.index + 1
if ci-offset > uint64(len(a.entries)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
}
l.append(ents[ci-offset:]...)
l.append(a.entries[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
Expand Down
12 changes: 11 additions & 1 deletion log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ func TestLogMaybeAppend(t *testing.T) {
}

for i, tt := range tests {
// TODO(pav-kv): for now, we pick a high enough app.term so that it
// represents a valid append message. The maybeAppend currently ignores it,
// but it must check that the append does not regress the term.
app := logSlice{
term: 100,
prev: tt.prev,
entries: tt.ents,
}
require.NoError(t, app.valid())

raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.committed = commit
Expand All @@ -306,7 +316,7 @@ func TestLogMaybeAppend(t *testing.T) {
require.True(t, tt.wpanic)
}
}()
glasti, gappend := raftLog.maybeAppend(tt.prev, tt.committed, tt.ents...)
glasti, gappend := raftLog.maybeAppend(app, tt.committed)
require.Equal(t, tt.wlasti, glasti)
require.Equal(t, tt.wappend, gappend)
require.Equal(t, tt.wcommit, raftLog.committed)
Expand Down
19 changes: 16 additions & 3 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,13 +1744,26 @@ func stepFollower(r *raft, m pb.Message) error {
return nil
}

// logSliceFromMsgApp extracts the appended logSlice from a MsgApp message.
func logSliceFromMsgApp(m *pb.Message) logSlice {
// TODO(pav-kv): consider also validating the logSlice here.
return logSlice{
term: m.Term,
prev: entryID{term: m.LogTerm, index: m.Index},
entries: m.Entries,
}
}

func (r *raft) handleAppendEntries(m pb.Message) {
prev := entryID{term: m.LogTerm, index: m.Index}
if prev.index < r.raftLog.committed {
// TODO(pav-kv): construct logSlice up the stack next to receiving the
// message, and validate it before taking any action (e.g. bumping term).
a := logSliceFromMsgApp(&m)

if a.prev.index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(prev, m.Commit, m.Entries...); ok {
if mlastIndex, ok := r.raftLog.maybeAppend(a, m.Commit); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
return
}
Expand Down

0 comments on commit bd5b421

Please sign in to comment.