Skip to content

Commit

Permalink
Send MsgAppend with empty entries when agent cannot forward
Browse files Browse the repository at this point in the history
Signed-off-by: LintianShi <[email protected]>
  • Loading branch information
LintianShi committed Sep 15, 2022
1 parent 965f2cf commit 40a8fcd
Showing 1 changed file with 30 additions and 2 deletions.
32 changes: 30 additions & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use std::cmp;
use std::ops::{Deref, DerefMut};

use crate::eraftpb::{
ConfChange, ConfChangeV2, ConfState, Entry, EntryType, HardState, Message, MessageType,
Snapshot,
ConfChange, ConfChangeV2, ConfState, Entry, EntryType, Forward, HardState, Message,
MessageType, Snapshot,
};
use protobuf::Message as _;
use raft_proto::ConfChangeI;
Expand Down Expand Up @@ -2570,6 +2570,7 @@ impl<T: Storage> Raft<T> {
self.r.send(m_append, &mut self.msgs);
}
Err(_) => {
self.dummy_forward(m, forward);
warn!(
self.logger,
"The agent fails to fetch entries, index {} log term {} in forward message to peer {}.",
Expand All @@ -2580,6 +2581,7 @@ impl<T: Storage> Raft<T> {
}
}
} else {
self.dummy_forward(m, forward);
warn!(
self.logger,
"The agent's log does not match with index {} log term {} in forward message to peer {}.",
Expand All @@ -2590,6 +2592,9 @@ impl<T: Storage> Raft<T> {
}
}
} else {
for forward in m.get_forwards() {
self.dummy_forward(m, forward);
}
info!(
self.logger,
"The agent rejects append [logterm: {msg_log_term}, index: {msg_index}] \
Expand Down Expand Up @@ -2932,6 +2937,29 @@ impl<T: Storage> Raft<T> {
self.lead_transferee = None;
}

// Forward MsgAppend with empty entries in order to update commit
// or trigger decrementing next_idx.
fn dummy_forward(&mut self, m: &Message, forward: &Forward) {
let mut m_append = Message::default();
m_append.to = forward.get_to();
m_append.from = m.get_from();
m_append.set_msg_type(MessageType::MsgAppend);
m_append.index = forward.get_index();
m_append.log_term = forward.get_log_term();
m_append.commit = m.get_commit();
m_append.commit_term = m.get_commit_term();

info!(
self.logger,
"The agent forwards reserved empty log entry [logterm: {msg_log_term}, index: {msg_index}] \
to peer {id}",
msg_log_term = forward.log_term,
msg_index = forward.index,
id = forward.to;
);
self.r.send(m_append, &mut self.msgs);
}

fn send_request_snapshot(&mut self) {
let mut m = Message::default();
m.set_msg_type(MessageType::MsgAppendResponse);
Expand Down

0 comments on commit 40a8fcd

Please sign in to comment.