From 4957ce5fa88741b9b611fba0eefd82bb75ebe81d Mon Sep 17 00:00:00 2001 From: LintianShi Date: Fri, 23 Sep 2022 18:17:21 +0800 Subject: [PATCH] async log entries fetch for forwarding Signed-off-by: LintianShi --- src/raft.rs | 147 ++++++++++++++++++++++++------------------------ src/raw_node.rs | 13 +++++ src/storage.rs | 8 +++ 3 files changed, 95 insertions(+), 73 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index df0e807e..e9c7d946 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -844,6 +844,63 @@ impl RaftCore { true } + fn send_forward( + &mut self, + from: u64, + commit: u64, + commit_term: u64, + forward: &Forward, + msgs: &mut Vec, + ) { + let mut m = Message::default(); + m.to = forward.to; + m.from = from; + m.commit = commit; + m.commit_term = commit_term; + m.set_msg_type(MessageType::MsgAppend); + // Fetch log entries from the forward.index to the last index of log. + if self + .raft_log + .match_term(forward.get_index(), forward.get_log_term()) + { + let ents = self.raft_log.entries( + forward.get_index() + 1, + self.max_msg_size, + GetEntriesContext(GetEntriesFor::SendForward { + from, + commit, + commit_term, + term: self.term, + forward: forward.clone(), + }), + ); + + match ents { + Ok(ents) => { + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + m.set_entries(ents.into()); + self.send(m, msgs); + } + Err(Error::Store(StorageError::LogTemporarilyUnavailable)) => {} + _ => { + // Forward MsgAppend with empty entries in order to update commit + // or trigger decrementing next_idx. + m.index = forward.get_index(); + m.log_term = forward.get_log_term(); + self.send(m, msgs); + warn!( + self.logger, + "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", + forward.get_index(), + forward.get_log_term(), + forward.get_to() + ); + } + } + } + } + // send_heartbeat sends an empty MsgAppend fn send_heartbeat( &mut self, @@ -904,6 +961,12 @@ impl Raft { .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); } + /// Forwards an append RPC from the leader to the given peer. + pub fn send_forward(&mut self, from: u64, commit: u64, commit_term: u64, forward: &Forward) { + self.r + .send_forward(from, commit, commit_term, forward, &mut self.msgs); + } + /// Broadcasts heartbeats to all the followers if it's leader. pub fn ping(&mut self) { if self.state == StateRole::Leader { @@ -2541,59 +2604,20 @@ impl Raft { // If the agent fails to append entries from the leader, // the agent cannot forward MsgAppend. for forward in m.get_forwards() { - // Fetch log entries from the forward.index to the last index of log. - if self - .raft_log - .match_term(forward.get_index(), forward.get_log_term()) - { - let ents = self.raft_log.entries( - forward.get_index() + 1, - self.max_msg_size, - GetEntriesContext(GetEntriesFor::SendAppend { - to: forward.get_to(), - term: m.term, - aggressively: false, - }), - ); - - match ents { - Ok(ents) => { - let mut m_append = Message::default(); - m_append.to = forward.get_to(); - m_append.from = m.get_from(); - m_append.set_msg_type(MessageType::MsgAppend); - m_append.index = forward.get_index(); - m_append.log_term = forward.get_log_term(); - m_append.set_entries(ents.into()); - m_append.commit = m.get_commit(); - m_append.commit_term = m.get_commit_term(); - self.r.send(m_append, &mut self.msgs); - } - Err(_) => { - self.dummy_forward(m, forward); - warn!( - self.logger, - "The agent fails to fetch entries, index {} log term {} in forward message to peer {}.", - forward.get_index(), - forward.get_log_term(), - forward.get_to() - ); - } - } - } else { - self.dummy_forward(m, forward); - warn!( - self.logger, - "The agent's log does not match with index {} log term {} in forward message to peer {}.", - forward.get_index(), - forward.get_log_term(), - forward.get_to() - ); - } + self.r + .send_forward(m.from, m.commit, m.commit_term, forward, &mut self.msgs); } } else { for forward in m.get_forwards() { - self.dummy_forward(m, forward); + let mut m_append = Message::default(); + m_append.to = forward.to; + m_append.from = m.from; + m_append.commit = m.commit; + m_append.commit_term = m.commit_term; + m_append.set_msg_type(MessageType::MsgAppend); + m_append.index = forward.get_index(); + m_append.log_term = forward.get_log_term(); + self.r.send(m_append, &mut self.msgs); } info!( self.logger, @@ -2937,29 +2961,6 @@ impl Raft { self.lead_transferee = None; } - // Forward MsgAppend with empty entries in order to update commit - // or trigger decrementing next_idx. - fn dummy_forward(&mut self, m: &Message, forward: &Forward) { - let mut m_append = Message::default(); - m_append.to = forward.get_to(); - m_append.from = m.get_from(); - m_append.set_msg_type(MessageType::MsgAppend); - m_append.index = forward.get_index(); - m_append.log_term = forward.get_log_term(); - m_append.commit = m.get_commit(); - m_append.commit_term = m.get_commit_term(); - - info!( - self.logger, - "The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \ - to peer {id}", - msg_log_term = forward.log_term, - msg_index = forward.index, - id = forward.to; - ); - self.r.send(m_append, &mut self.msgs); - } - fn send_request_snapshot(&mut self) { let mut m = Message::default(); m.set_msg_type(MessageType::MsgAppendResponse); diff --git a/src/raw_node.rs b/src/raw_node.rs index a15a1489..a720bfd0 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -439,6 +439,19 @@ impl RawNode { self.raft.send_append(to) } } + GetEntriesFor::SendForward { + from, + commit, + commit_term, + term, + forward, + } => { + if self.raft.term != term { + // term has changed + return; + } + self.raft.send_forward(from, commit, commit_term, &forward); + } GetEntriesFor::Empty(can_async) if can_async => {} _ => panic!("shouldn't call callback on non-async context"), } diff --git a/src/storage.rs b/src/storage.rs index f9b71fab..3065600e 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -70,6 +70,7 @@ impl GetEntriesContext { pub fn can_async(&self) -> bool { match self.0 { GetEntriesFor::SendAppend { .. } => true, + GetEntriesFor::SendForward { .. } => true, GetEntriesFor::Empty(can_async) => can_async, _ => false, } @@ -87,6 +88,13 @@ pub(crate) enum GetEntriesFor { /// whether to exhaust all the entries aggressively: bool, }, + SendForward { + from: u64, + commit: u64, + commit_term: u64, + term: u64, + forward: Forward, + }, // for getting committed entries in a ready GenReady, // for getting entries to check pending conf when transferring leader