Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rawnode: expose per-follower MsgApp message stream #161

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ stale log entries:
rafthttp package.

'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
which calls maybeSendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
back to follower, because it indicates that there is a valid leader sending
'MsgApp' messages. Candidate and follower respond to this message in
Expand Down Expand Up @@ -353,8 +353,8 @@ stale log entries:

'MsgSnap' requests to install a snapshot message. When a node has just
become a leader or the leader receives 'MsgProp' message, it calls
'bcastAppend' method, which then calls 'sendAppend' method to each
follower. In 'sendAppend', if a leader fails to get term or entries,
'bcastAppend' method, which then calls 'maybeSendAppend' method to each
follower. In 'maybeSendAppend', if a leader fails to get term or entries,
the leader requests snapshot by sending 'MsgSnap' type message.

'MsgSnapStatus' tells the result of snapshot install message. When a
Expand All @@ -376,7 +376,7 @@ stale log entries:
'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
is passed to leader's Step method, the leader knows which follower
responded. And only when the leader's last committed index is greater than
follower's Match index, the leader runs 'sendAppend` method.
follower's Match index, the leader runs 'maybeSendAppend` method.

'MsgUnreachable' tells that request(message) wasn't delivered. When
'MsgUnreachable' is passed to leader's Step method, the leader discovers
Expand Down
206 changes: 113 additions & 93 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,21 @@
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
MaxInflightBytes uint64
// DisableEagerAppends makes raft hold off constructing log append messages in
// response to Step() calls. The messages can be collected via a separate
// MessagesTo method.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very clear on usage, specifically compared to how we currently call RawNode.Ready() in CockroachDB, and then send the outbound messages in Ready.Messages.

If this is set to true, what will Ready.Messages contain? Is it only MsgStorageAppend and MsgStorageApply messages to the local store?

How will the user know to call MessagesTo -- presumably the user has to know that there is something available to send to a replica? Is the idea that the user will retrieve tracker.Progress and somehow know that there are entries available >= Progress.Next, and then decide to call MessagesTo? If it gets back (Progress.Next, j] and sends them, will it then call Progress.SentEntries(j) that will update Progress.Next to j+1? Isn't Progress just a struct, so how will this Progress state change go back into the RawNode?

Does etcd/raft assume that whatever has been retrieved in MessagesTo is the responsibility of the caller to reliably deliver, or does it itself retry? That is, who is responsible for ensuring that (Progress.Match,Progress.Next) is delivered? I am also murky on whether everything <= Progress.Match has indeed been delivered, because of the following comment in the CockroachDB code that says:

		// In state ProgressStateReplicate, the Match index is optimistically
		// updated whenever a message is *sent* (not received). Due to Raft
		// flow control, only a reasonably small amount of data can be en
		// route to a given follower at any point in time.

Copy link
Contributor Author

@pav-kv pav-kv Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ready() currently returns all messages: this includes appends/snapshots to followers, appends / applies to local storage, heartbeats, etc.

With DisableEagerAppends it would still return all these messages, except MsgApp and snapshots. These would go through the MessagesTo interface. In CRDB, we would still need to call and act on Ready. But we would also call MessagesTo next to calling Ready (or may do so separately / on a different schedule, because it's not necessary to pair with Ready calls).

We would not use / update Progress structs, raft internally does so when it emits messages.

How will the user know to call MessagesTo -- presumably the user has to know that there is something available to send to a replica?

We also don't necessarily need to sneak peek into Progress. The MessageTo calls will return appends if they need to happen, and won't return anything if Progress indicates that the follower is up-to-date or throttled.

Does etcd/raft assume that whatever has been retrieved in MessagesTo is the responsibility of the caller to reliably deliver, or does it itself retry?

The expectation is that these messages are going to be sent. There is no reliability expectation, raft assumes that all messages can be dropped. Raft will retry things periodically, and if it detects that the appends message stream was broken, it will resend the portion of the log that was not acked.

Entries returned from MessagesTo will now be considered in-flight by raft.

because of the following comment in the CockroachDB code that says

The comment is inaccurate. It is the Next index that's optimistically updated when a message is sent. The Match index is a guaranteed durable match (because the follower replied with a MsgAppResp acking log indices up to Match).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing we need to know to control this is the IDs of the followers, because MessagesTo takes the node ID of the recepient.

One way to know the IDs is iterating the Progress map, or looking at the current config. We need to be careful with config changes here though - I haven't considered the implications here yet.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no reliability expectation, raft assumes that all messages can be dropped. Raft will retry things periodically, and if it detects that the appends message stream was broken, it will resend the portion of the log that was not acked.

So etcd/raft takes on the responsibility of resending (Match, Next)? With this inversion of control via the MessagesTo interface, will that retry also happen via this interface?

So something like the following where all messages are size 1:

  • Match=5
  • MessagesTo(MaxMsgAppBytes: 3) returns 6, 7, 8. Next=9
  • MessagesTo(MaxMsgAppBytes: 2) returns 9, 10. Next=11
  • Match=6
  • Raft decides to retry 7 onwards.
  • MessagesTo(MaxMsgAppBytes: 1) returns 7

Copy link
Contributor Author

@pav-kv pav-kv Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the retries happen like you described, transparently to the application-level caller. The application just needs to send the messages.

Best results are achieved if these messages are delivered in order though (that's also the case today), because each "optimistic" MsgApp builds on top of the previous ones. But if any message is dropped and the flow is "interrupted", raft will learn this (via MsgAppResp responses), rollback the Next and retry.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will the user know to call MessagesTo

I'm also curious about this. Will this be hooked in to RawNode.HasReady? Will we introduce a new HasMessagesTo(FlowControl) API? What will indicate to CockroachDB that a replica needs to be scheduled on the raft scheduler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity: the supposed flow here is that the upper layer (e.g. CRDB) tracks all follower flows in StateReplicate which are "ready" to send entries. The condition for this is Next <= raftLog.lastIndex() && !Inflights.Full(). The tracking is "perfect", in that raft will expose all events that help tracking this info and keeping it consistent with its own Progress (e.g. going in/out of StateLeader and StateReplicate, config changes that remove a follower, etc).

Then, on Ready processing, the caller goes through all "ready" flows and calls this method to send messages. The "perfect" tracking helps avoiding wasted calls.

//
// This way, the application has better control when raft may call Storage
// methods and allocate memory for entries and messages.
//
// Setting this to true also improves batching: messages are constructed on
// demand, and tend to contain more entries. The application can control the
// latency/throughput trade-off by collecting messages more or less
// frequently.
//
// With this setting set to false, messages are constructed eagerly in Step()
// calls, and typically will consist of a single / few entries.
DisableEagerAppends bool

// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
Expand Down Expand Up @@ -335,6 +350,12 @@
return nil
}

type msgBuf []pb.Message

func (mb *msgBuf) append(m pb.Message) {
*(*[]pb.Message)(mb) = append(*(*[]pb.Message)(mb), m)
}

type raft struct {
id uint64

Expand All @@ -360,7 +381,7 @@
// other nodes.
//
// Messages in this list must target other nodes.
msgs []pb.Message
msgs msgBuf
// msgsAfterAppend contains the list of messages that should be sent after
// the accumulated unstable state (e.g. term, vote, []entry, and snapshot)
// has been persisted to durable storage. This includes waiting for any
Expand All @@ -372,6 +393,10 @@
// Messages in this list have the type MsgAppResp, MsgVoteResp, or
// MsgPreVoteResp. See the comment in raft.send for details.
msgsAfterAppend []pb.Message
// disableEagerAppends instructs append message construction and sending until
// the Ready() call. This improves batching and allows better resource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

until the Ready() call

Until the MessagesTo() call?

// allocation control by the application.
disableEagerAppends bool

// the leader id
lead uint64
Expand Down Expand Up @@ -447,6 +472,7 @@
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize),
trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
disableEagerAppends: c.DisableEagerAppends,
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand Down Expand Up @@ -502,6 +528,11 @@
// send schedules persisting state to a stable storage and AFTER that
// sending the message (as part of next Ready message processing).
func (r *raft) send(m pb.Message) {
r.sendTo(&r.msgs, m)
}

// sendTo prepares the given message, and puts it to the output messages buffer.
func (r *raft) sendTo(buf *msgBuf, m pb.Message) {
if m.From == None {
m.From = r.id
}
Expand Down Expand Up @@ -584,24 +615,51 @@
if m.To == r.id {
r.logger.Panicf("message should not be self-addressed when sending %s", m.Type)
}
r.msgs = append(r.msgs, m)
buf.append(m)
}
}

// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
r.maybeSendAppend(to, true)
func (r *raft) getMessages(to uint64, fc FlowControl, buffer []pb.Message) []pb.Message {

Check warning on line 622 in raft.go

View workflow job for this annotation

GitHub Actions / run

unused-parameter: parameter 'fc' seems to be unused, consider removing or renaming it as _ (revive)
if to == r.id {
// TODO(pav-kv): async log storage writes should go through this path.
return buffer
}
pr := r.trk.Progress[to]
buf := msgBuf(buffer)
for r.maybeSendAppendBuf(to, pr, &buf) {
}
return buf
}

// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
pr := r.trk.Progress[to]
if pr.IsPaused() {
// maybeSendAppend sends an append RPC with log entries (if any) that are not
// yet known to be replicated in the given peer's log, as well as the current
// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the
// log has been compacted) it can send a MsgSnap.
//
// In some cases, the MsgApp message can have zero entries, and yet being sent.
// When the follower log is not fully up-to-date, we must send a MsgApp
// periodically so that eventually the flow is either accepted or rejected. Not
// doing so can result in replication stall, in cases when a MsgApp is dropped.
//
// Returns true if a message was sent, or false otherwise. A message is not sent
// if the follower log and commit index are up-to-date, the flow is paused (for
// reasons like in-flight limits), or the message could not be constructed.
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
if r.disableEagerAppends {
return false
}
return r.maybeSendAppendBuf(to, pr, &r.msgs)
}

func (r *raft) appendsReady(pr *tracker.Progress) bool {
return pr.ShouldSendMsgApp(r.raftLog.lastIndex(), r.raftLog.committed)
}

// maybeSendAppendBuf implements maybeSendAppend, and puts the messages into the
// provided buffer.
func (r *raft) maybeSendAppendBuf(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit) {
return false
}

Expand All @@ -610,43 +668,34 @@
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.maybeSendSnapshot(to, pr)
return r.maybeSendSnapshot(to, pr, buf)
}

var ents []pb.Entry
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
}
if len(ents) == 0 && !sendIfEmpty {
return false
}
// TODO(pav-kv): move this check up to where err is returned.
if err != nil { // send a snapshot if we failed to get the entries
return r.maybeSendSnapshot(to, pr)
var entries []pb.Entry
if pr.CanSendEntries(last) {
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
// Send a snapshot if we failed to get the entries.
return r.maybeSendSnapshot(to, pr, buf)
}
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
r.send(pb.Message{
// Send the MsgApp, and update the progress accordingly.
r.sendTo(buf, pb.Message{
To: to,
Type: pb.MsgApp,
Index: prevIndex,
LogTerm: prevTerm,
Entries: ents,
Commit: r.raftLog.committed,
Entries: entries,
Commit: commit,
})
pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)))
pr.SentEntries(len(entries), uint64(payloadsSize(entries)))
pr.SentCommit(commit)
return true
}

// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
// node. Returns true iff the snapshot message has been emitted successfully.
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress, buf *msgBuf) bool {
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
Expand All @@ -669,37 +718,37 @@
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
r.sendTo(buf, pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
}

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
pr := r.trk.Progress[to]
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.trk.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
commit := min(pr.Match, r.raftLog.committed)
r.send(pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}

r.send(m)
})
pr.SentCommit(commit)
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.trk.
func (r *raft) bcastAppend() {
r.trk.Visit(func(id uint64, _ *tracker.Progress) {
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.sendAppend(id)
r.maybeSendAppend(id, pr)
})
}

Expand Down Expand Up @@ -1477,10 +1526,9 @@
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
r.maybeSendAppend(m.From, pr)
}
} else {
oldPaused := pr.IsPaused()
// We want to update our tracking if the response updates our
// matched index or if the response can move a probing peer back
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
Expand Down Expand Up @@ -1517,19 +1565,13 @@
// to respond to pending read index requests
releasePendingReadIndexMessages(r)
r.bcastAppend()
} else if oldPaused {
// If we were paused before, this node may be missing the
// latest commit index, so send it.
r.sendAppend(m.From)
}
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
// We've updated flow control information above, which may allow us to
// send multiple (size-limited) in-flight messages at once (such as when
// transitioning from probe to replicate, or when freeTo() covers
// multiple messages). Send as many messages as we can.
if r.id != m.From {
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
for r.maybeSendAppend(m.From, pr) {
}
}
// Transfer leadership is in progress.
Expand All @@ -1541,24 +1583,8 @@
}
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.MsgAppFlowPaused = false

// NB: if the follower is paused (full Inflights), this will still send an
// empty append, allowing it to recover from situations in which all the
// messages that filled up Inflights in the first place were dropped. Note
// also that the outgoing heartbeat already communicated the commit index.
//
// If the follower is fully caught up but also in StateProbe (as can happen
// if ReportUnreachable was called), we also want to send an append (it will
// be empty) to allow the follower to transition back to StateReplicate once
// it responds.
//
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
// no-op.
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
r.sendAppend(m.From)
}
pr.PauseMsgAppProbes(false)
r.maybeSendAppend(m.From, pr)

if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
Expand Down Expand Up @@ -1591,7 +1617,7 @@
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.MsgAppFlowPaused = true
pr.PauseMsgAppProbes(true)
case pb.MsgUnreachable:
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
Expand Down Expand Up @@ -1628,7 +1654,8 @@
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
pr.PauseMsgAppProbes(false)
r.maybeSendAppend(leadTransferee, pr)
}
}
return nil
Expand Down Expand Up @@ -1976,21 +2003,14 @@
return cs
}

if r.maybeCommit() {
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
r.bcastAppend()
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
if id == r.id {
return
}
r.maybeSendAppend(id, false /* sendIfEmpty */)
})
}
r.maybeCommit()
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
//
// Otherwise, still probe the newly added replicas; there's no reason to let
// them wait out a heartbeat interval (or the next incoming proposal).
r.bcastAppend()

// If the leadTransferee was removed or demoted, abort the leadership transfer.
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
r.abortLeaderTransfer()
Expand Down
Loading
Loading