From 887c96f9e222851c9db85ceb13c7aee35952a5ad Mon Sep 17 00:00:00 2001 From: LintianShi Date: Fri, 2 Sep 2022 13:18:37 +0800 Subject: [PATCH] Add lag flag in progress Signed-off-by: LintianShi --- src/raft.rs | 15 ++++++++++++--- src/tracker/progress.rs | 25 ++++++++++++++++++++++--- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 95059354..d11c697b 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -800,6 +800,11 @@ impl RaftCore { ); return false; } + // Check whether the progress lags behind the leader too much. + if !pr.lag && pr.matched + 200 < self.raft_log.last_index() { + pr.lag = true; + } + let mut m = Message::default(); m.to = to; if pr.pending_request_snapshot != INVALID_INDEX { @@ -1048,8 +1053,10 @@ impl Raft { ); } let self_id = self.id; + let last_idx = self.raft_log.last_index(); let pr = self.mut_prs().get_mut(self_id).unwrap(); - if pr.maybe_update(index) && self.maybe_commit() && self.should_bcast_commit() { + if pr.maybe_update(index, last_idx) && self.maybe_commit() && self.should_bcast_commit() + { self.bcast_append(); } } @@ -1707,6 +1714,7 @@ impl Raft { .0; } + let last_idx = self.raft_log.last_index(); let pr = match self.prs.get_mut(m.from) { Some(pr) => pr, None => { @@ -1769,7 +1777,7 @@ impl Raft { } let old_paused = pr.is_paused(); - if !pr.maybe_update(m.index) { + if !pr.maybe_update(m.index, last_idx) { return; } @@ -2742,8 +2750,9 @@ impl Raft { } // TODO: this is untested and likely unneeded + let last_idx = self.raft_log.last_index(); let pr = self.prs.get_mut(self.id).unwrap(); - pr.maybe_update(pr.next_idx - 1); + pr.maybe_update(pr.next_idx - 1, last_idx); self.pending_request_snapshot = INVALID_INDEX; diff --git a/src/tracker/progress.rs b/src/tracker/progress.rs index 2f86f5a7..ddba2974 100644 --- a/src/tracker/progress.rs +++ b/src/tracker/progress.rs @@ -39,6 +39,9 @@ pub struct Progress { /// RecentActive can be reset to false after an election timeout. pub recent_active: bool, + /// This is true if the progress lags behind the leader too much. + pub lag: bool, + /// Inflights is a sliding window for the inflight messages. /// When inflights is full, no more message should be sent. /// When a leader sends out a message, the index of the last @@ -66,6 +69,7 @@ impl Progress { pending_snapshot: 0, pending_request_snapshot: 0, recent_active: false, + lag: false, ins: Inflights::new(ins_size), commit_group_id: 0, committed_index: 0, @@ -87,6 +91,7 @@ impl Progress { self.pending_snapshot = 0; self.pending_request_snapshot = INVALID_INDEX; self.recent_active = false; + self.lag = false; self.ins.reset(); } @@ -133,10 +138,14 @@ impl Progress { /// Returns false if the given n index comes from an outdated message. /// Otherwise it updates the progress and returns true. - pub fn maybe_update(&mut self, n: u64) -> bool { + /// If the matched has catched up with last_index, reset the lag flag. + pub fn maybe_update(&mut self, n: u64, last_index: u64) -> bool { let need_update = self.matched < n; if need_update { self.matched = n; + if self.lag && self.matched + 20 > last_index { + self.lag = false; + } self.resume(); }; @@ -287,7 +296,7 @@ mod tests { p.maybe_decr_to(1, 1, INVALID_INDEX); assert!(!p.paused, "paused= true, want false"); p.paused = true; - p.maybe_update(2); + p.maybe_update(2, 2); assert!(!p.paused, "paused= true, want false"); } @@ -357,7 +366,7 @@ mod tests { for (i, &(update, wm, wn, wok)) in tests.iter().enumerate() { let mut p = Progress::new(prev_n, 256); p.matched = prev_m; - let ok = p.maybe_update(update); + let ok = p.maybe_update(update, prev_n); if ok != wok { panic!("#{}: ok= {}, want {}", i, ok, wok); } @@ -368,6 +377,16 @@ mod tests { panic!("#{}: next= {}, want {}", i, p.next_idx, wn); } } + let mut p = Progress::new(5, 256); + p.matched = prev_m; + p.lag = true; + let ok = p.maybe_update(prev_m + 1, prev_m + 10); + if !ok { + panic!("ok= {}", ok); + } + if p.lag { + panic!("lag= {}", p.lag); + } } #[test]