Skip to content

Commit

Permalink
Group broadcast log entries in bcast_append when follower replication…
Browse files Browse the repository at this point in the history
… is enabled

Signed-off-by: LintianShi <[email protected]>
  • Loading branch information
LintianShi committed Oct 14, 2022
1 parent 87b2014 commit 9406810
Showing 1 changed file with 122 additions and 6 deletions.
128 changes: 122 additions & 6 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

use std::cmp;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};

use crate::eraftpb::{
Expand Down Expand Up @@ -997,16 +998,131 @@ impl<T: Storage> Raft<T> {
self.r.send_append_aggressively(to, pr, &mut self.msgs)
}

fn select_agent_for_bcast_group(&self, group: &[usize], msgs: &[Message]) -> Option<usize> {
let mut agent_idx: Option<usize> = None;
for idx in group {
let peer_id = msgs[*idx].to;
let is_voter = self.prs().conf().voters().contains(peer_id);
// Agent must be voter and recently active.
if !is_voter || !self.is_recent_active(peer_id) {
continue;
}
agent_idx = Some(*idx);
}
agent_idx
}

fn merge_append_group(&self, group: &[usize], msgs: &mut [Message], skip: &mut [bool]) {
// Do not need to merge if group size is less than two.
if group.len() < 2 {
return;
}
let agent_idx = self.select_agent_for_bcast_group(group, msgs);
// Return if no appropriate agent
if agent_idx.is_none() {
return;
}

// Record forward information
let mut forwards: Vec<Forward> = Vec::default();
for idx in group {
if *idx == agent_idx.unwrap() {
// MsgAppend sent to the agent is changed to MsgGroupBroadcast.
let msg = &mut msgs[*idx];
msg.set_msg_type(MessageType::MsgGroupBroadcast);
} else {
// MsgAppend sent to other peers in this group only reserve basic
// forward information.
let msg = &msgs[*idx];
let forward = Forward {
to: msg.to,
log_term: msg.log_term,
index: msg.index,
..Default::default()
};
forwards.push(forward);
// Mark and skip this message later.
skip[*idx] = true;
}
}
// Attach forward information to MsgGroupbroadcast
let agent_msg = &mut msgs[agent_idx.unwrap()];
agent_msg.set_forwards(forwards.into());
}

/// Sends RPC, with entries to all peers that are not up-to-date
/// according to the progress recorded in r.prs().
pub fn bcast_append(&mut self) {
let self_id = self.id;
let core = &mut self.r;
let msgs = &mut self.msgs;
self.prs
.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| core.send_append(*id, pr, msgs));
let mut msgs: Vec<Message> = Vec::default();
{
// Messages are stored in a vector temporarily.
// They will be pushed to message queue later.
let core = &mut self.r;
self.prs
.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| core.send_append(*id, pr, &mut msgs));
}

// Use leader replication if follower replication is disabled or
// the broadcast group id of leader is unknown. Broadcast MsgAppend
// as normal.
let leader_group_id = self
.prs()
.get(self_id)
.map_or(0, |pr| pr.broadcast_group_id);
if !self.follower_replication() || leader_group_id != 0 {
self.msgs.append(&mut msgs);
return;
}

// If follower replication is enabled, MsgAppends sent to the same broadcast group
// will be merge into a MsgGroupBroadcast.
//
// Record message that should be discarded after merging.
let mut skip = vec![false; msgs.len()];
// Message group:
// broadcast group id -> {index of messages in msgs}
let mut msg_group: HashMap<u64, Vec<usize>> = HashMap::default();

// Iterate messages generated by leader.
// Filter out messages that should be sent by follower replication,
// and group them by broadcast group id.
for (pos, msg) in msgs.iter().enumerate() {
// Only reserve MsgAppend sent to peers in replicating state
if msg.get_msg_type() != MessageType::MsgAppend || !self.is_replicating(msg.to) {
continue;
}
// Get the broadcast group id of target peer.
let group_id = self.prs().get(msg.to).map_or(0, |pr| pr.broadcast_group_id);
// Do not need merge if broadcast group id is unknown or in the same
// group with leader. Valid broadcast group id should be greater than 0.
if group_id == 0 || group_id != leader_group_id {
continue;
}

// Group messages
if let Some(group) = msg_group.get_mut(&group_id) {
group.push(pos);
} else {
msg_group.insert(group_id, vec![pos]);
}
}

// Merge MsgAppend in broadcast groups.
for (_, group) in msg_group {
self.merge_append_group(&group, &mut msgs, &mut skip);
}

let mut idx: usize = 0;
for msg in msgs {
if !skip[idx] {
continue;
}
self.msgs.push(msg);
idx += 1
}
}

/// Forwards an append RPC from the leader to the given peer.
Expand Down

0 comments on commit 9406810

Please sign in to comment.