diff --git a/src/raft.rs b/src/raft.rs index 2d9ad310..67c03cb1 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -15,6 +15,7 @@ // limitations under the License. use std::cmp; +use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use crate::eraftpb::{ @@ -997,16 +998,131 @@ impl Raft { self.r.send_append_aggressively(to, pr, &mut self.msgs) } + fn select_agent_for_bcast_group(&self, group: &[usize], msgs: &[Message]) -> Option { + let mut agent_idx: Option = None; + for idx in group { + let peer_id = msgs[*idx].to; + let is_voter = self.prs().conf().voters().contains(peer_id); + // Agent must be voter and recently active. + if !is_voter || !self.is_recent_active(peer_id) { + continue; + } + agent_idx = Some(*idx); + } + agent_idx + } + + fn merge_append_group(&self, group: &[usize], msgs: &mut [Message], skip: &mut [bool]) { + // Do not need to merge if group size is less than two. + if group.len() < 2 { + return; + } + let agent_idx = self.select_agent_for_bcast_group(group, msgs); + // Return if no appropriate agent + if agent_idx.is_none() { + return; + } + + // Record forward information + let mut forwards: Vec = Vec::default(); + for idx in group { + if *idx == agent_idx.unwrap() { + // MsgAppend sent to the agent is changed to MsgGroupBroadcast. + let msg = &mut msgs[*idx]; + msg.set_msg_type(MessageType::MsgGroupBroadcast); + } else { + // MsgAppend sent to other peers in this group only reserve basic + // forward information. + let msg = &msgs[*idx]; + let forward = Forward { + to: msg.to, + log_term: msg.log_term, + index: msg.index, + ..Default::default() + }; + forwards.push(forward); + // Mark and skip this message later. + skip[*idx] = true; + } + } + // Attach forward information to MsgGroupbroadcast + let agent_msg = &mut msgs[agent_idx.unwrap()]; + agent_msg.set_forwards(forwards.into()); + } + /// Sends RPC, with entries to all peers that are not up-to-date /// according to the progress recorded in r.prs(). pub fn bcast_append(&mut self) { let self_id = self.id; - let core = &mut self.r; - let msgs = &mut self.msgs; - self.prs - .iter_mut() - .filter(|&(id, _)| *id != self_id) - .for_each(|(id, pr)| core.send_append(*id, pr, msgs)); + let mut msgs: Vec = Vec::default(); + { + // Messages are stored in a vector temporarily. + // They will be pushed to message queue later. + let core = &mut self.r; + self.prs + .iter_mut() + .filter(|&(id, _)| *id != self_id) + .for_each(|(id, pr)| core.send_append(*id, pr, &mut msgs)); + } + + // Use leader replication if follower replication is disabled or + // the broadcast group id of leader is unknown. Broadcast MsgAppend + // as normal. + let leader_group_id = self + .prs() + .get(self_id) + .map_or(0, |pr| pr.broadcast_group_id); + if !self.follower_replication() || leader_group_id != 0 { + self.msgs.append(&mut msgs); + return; + } + + // If follower replication is enabled, MsgAppends sent to the same broadcast group + // will be merge into a MsgGroupBroadcast. + // + // Record message that should be discarded after merging. + let mut skip = vec![false; msgs.len()]; + // Message group: + // broadcast group id -> {index of messages in msgs} + let mut msg_group: HashMap> = HashMap::default(); + + // Iterate messages generated by leader. + // Filter out messages that should be sent by follower replication, + // and group them by broadcast group id. + for (pos, msg) in msgs.iter().enumerate() { + // Only reserve MsgAppend sent to peers in replicating state + if msg.get_msg_type() != MessageType::MsgAppend || !self.is_replicating(msg.to) { + continue; + } + // Get the broadcast group id of target peer. + let group_id = self.prs().get(msg.to).map_or(0, |pr| pr.broadcast_group_id); + // Do not need merge if broadcast group id is unknown or in the same + // group with leader. Valid broadcast group id should be greater than 0. + if group_id == 0 || group_id != leader_group_id { + continue; + } + + // Group messages + if let Some(group) = msg_group.get_mut(&group_id) { + group.push(pos); + } else { + msg_group.insert(group_id, vec![pos]); + } + } + + // Merge MsgAppend in broadcast groups. + for (_, group) in msg_group { + self.merge_append_group(&group, &mut msgs, &mut skip); + } + + let mut idx: usize = 0; + for msg in msgs { + if !skip[idx] { + continue; + } + self.msgs.push(msg); + idx += 1 + } } /// Forwards an append RPC from the leader to the given peer.