Skip to content

Commit

Permalink
Add lag flag in progress
Browse files Browse the repository at this point in the history
Signed-off-by: LintianShi <[email protected]>
  • Loading branch information
LintianShi committed Sep 2, 2022
1 parent 9f8f31c commit 887c96f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
15 changes: 12 additions & 3 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,11 @@ impl<T: Storage> RaftCore<T> {
);
return false;
}
// Check whether the progress lags behind the leader too much.
if !pr.lag && pr.matched + 200 < self.raft_log.last_index() {
pr.lag = true;
}

let mut m = Message::default();
m.to = to;
if pr.pending_request_snapshot != INVALID_INDEX {
Expand Down Expand Up @@ -1048,8 +1053,10 @@ impl<T: Storage> Raft<T> {
);
}
let self_id = self.id;
let last_idx = self.raft_log.last_index();
let pr = self.mut_prs().get_mut(self_id).unwrap();
if pr.maybe_update(index) && self.maybe_commit() && self.should_bcast_commit() {
if pr.maybe_update(index, last_idx) && self.maybe_commit() && self.should_bcast_commit()
{
self.bcast_append();
}
}
Expand Down Expand Up @@ -1707,6 +1714,7 @@ impl<T: Storage> Raft<T> {
.0;
}

let last_idx = self.raft_log.last_index();
let pr = match self.prs.get_mut(m.from) {
Some(pr) => pr,
None => {
Expand Down Expand Up @@ -1769,7 +1777,7 @@ impl<T: Storage> Raft<T> {
}

let old_paused = pr.is_paused();
if !pr.maybe_update(m.index) {
if !pr.maybe_update(m.index, last_idx) {
return;
}

Expand Down Expand Up @@ -2742,8 +2750,9 @@ impl<T: Storage> Raft<T> {
}

// TODO: this is untested and likely unneeded
let last_idx = self.raft_log.last_index();
let pr = self.prs.get_mut(self.id).unwrap();
pr.maybe_update(pr.next_idx - 1);
pr.maybe_update(pr.next_idx - 1, last_idx);

self.pending_request_snapshot = INVALID_INDEX;

Expand Down
25 changes: 22 additions & 3 deletions src/tracker/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub struct Progress {
/// RecentActive can be reset to false after an election timeout.
pub recent_active: bool,

/// This is true if the progress lags behind the leader too much.
pub lag: bool,

/// Inflights is a sliding window for the inflight messages.
/// When inflights is full, no more message should be sent.
/// When a leader sends out a message, the index of the last
Expand Down Expand Up @@ -66,6 +69,7 @@ impl Progress {
pending_snapshot: 0,
pending_request_snapshot: 0,
recent_active: false,
lag: false,
ins: Inflights::new(ins_size),
commit_group_id: 0,
committed_index: 0,
Expand All @@ -87,6 +91,7 @@ impl Progress {
self.pending_snapshot = 0;
self.pending_request_snapshot = INVALID_INDEX;
self.recent_active = false;
self.lag = false;
self.ins.reset();
}

Expand Down Expand Up @@ -133,10 +138,14 @@ impl Progress {

/// Returns false if the given n index comes from an outdated message.
/// Otherwise it updates the progress and returns true.
pub fn maybe_update(&mut self, n: u64) -> bool {
/// If the matched has catched up with last_index, reset the lag flag.
pub fn maybe_update(&mut self, n: u64, last_index: u64) -> bool {
let need_update = self.matched < n;
if need_update {
self.matched = n;
if self.lag && self.matched + 20 > last_index {
self.lag = false;
}
self.resume();
};

Expand Down Expand Up @@ -287,7 +296,7 @@ mod tests {
p.maybe_decr_to(1, 1, INVALID_INDEX);
assert!(!p.paused, "paused= true, want false");
p.paused = true;
p.maybe_update(2);
p.maybe_update(2, 2);
assert!(!p.paused, "paused= true, want false");
}

Expand Down Expand Up @@ -357,7 +366,7 @@ mod tests {
for (i, &(update, wm, wn, wok)) in tests.iter().enumerate() {
let mut p = Progress::new(prev_n, 256);
p.matched = prev_m;
let ok = p.maybe_update(update);
let ok = p.maybe_update(update, prev_n);
if ok != wok {
panic!("#{}: ok= {}, want {}", i, ok, wok);
}
Expand All @@ -368,6 +377,16 @@ mod tests {
panic!("#{}: next= {}, want {}", i, p.next_idx, wn);
}
}
let mut p = Progress::new(5, 256);
p.matched = prev_m;
p.lag = true;
let ok = p.maybe_update(prev_m + 1, prev_m + 10);
if !ok {
panic!("ok= {}", ok);
}
if p.lag {
panic!("lag= {}", p.lag);
}
}

#[test]
Expand Down

0 comments on commit 887c96f

Please sign in to comment.