Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new message type MsgGroupBroadcast and corresponding handler #485

Open
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

LintianShi
Copy link

Signed-off-by: LintianShi [email protected]

Modify raft-rs to support follower replication in TiKV.
Main change:

  • Add option of follower replication for RawNode and Config
  • Add new message type: MsgGroupBroadcast and an extra field forwards in Message, which contains information that needs to be forwarded.
  • Add handler for MsgGroupBroadcast, which appends entries to local log and forward MsgAppend to other peers.

@LintianShi
Copy link
Author

@Connor1996 PTAL

@LintianShi LintianShi force-pushed the groupbroadcast branch 9 times, most recently from 2084791 to ef1ec68 Compare September 3, 2022 04:35
@LintianShi LintianShi force-pushed the groupbroadcast branch 3 times, most recently from d4a8512 to 806b77a Compare September 15, 2022 06:33
Copy link
Member

@BusyJay BusyJay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will MsgGroupBroadcast be set?

src/raft.rs Outdated Show resolved Hide resolved
src/raft.rs Outdated Show resolved Hide resolved
src/raft.rs Outdated
m_append.commit_term = m.get_commit_term();
self.r.send(m_append, &mut self.msgs);
}
Err(_) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider temporary unavailable

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the processing like the way in fn send_append_aggressively, looping for fetching until it success?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, call it again in on_entries_fetched

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async fetch for forwarding is committed. PTAL.

src/raft.rs Outdated Show resolved Hide resolved
src/raft.rs Outdated
/// This enables data replication from a follower to other servers in the same available zone.
///
/// Enable this for reducing across-AZ traffic of cloud deployment.
pub follower_repl: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see nowhere uses it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in raftstore.

src/raft.rs Outdated
}

// For a broadcast, append entries to onw log and forward MsgAppend to other dest.
fn handle_group_broadcast(&mut self, m: &Message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the broadcast message is sent

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In raftstore, several MsgAppend will be merged into a MsgGroupBroadcast in fn build_raft_messages

@BusyJay
Copy link
Member

BusyJay commented Sep 27, 2022

Test case is missing.

src/config.rs Outdated Show resolved Hide resolved
src/storage.rs Outdated Show resolved Hide resolved
src/tracker/progress.rs Outdated Show resolved Hide resolved
src/raft.rs Outdated Show resolved Hide resolved
@@ -70,6 +70,7 @@ impl GetEntriesContext {
pub fn can_async(&self) -> bool {
match self.0 {
GetEntriesFor::SendAppend { .. } => true,
GetEntriesFor::SendForward { .. } => true,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be false to avoid the async fetch's potential latency in follower replication.

src/raft.rs Outdated
}

// Group messages
if let Some(group) = msg_group.get_mut(&group_id) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg_group.entry().or_default().and_modify()

src/raft.rs Outdated
self.prs
.iter_mut()
.filter(|&(id, _)| *id != self_id)
.for_each(|(id, pr)| core.send_append(*id, pr, &mut msgs));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can form msg group from here directly

src/raft.rs Outdated

let mut idx: usize = 0;
for msg in msgs {
if !skip[idx] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about grouping the messages instead of the pos. And merge_append_group modifies or filters the messages directly instead of by passing skip

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In new commit, I have modified it. PTAL.

@@ -517,6 +554,31 @@ impl<T: Storage> Raft<T> {
self.prs().group_commit()
}

/// Checks whether the raft group is using group commit and consistent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move it around?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a mis-operation. I will fix it.

src/raft.rs Outdated
/// Sends RPC, with entries to all peers that are not up-to-date
/// according to the progress recorded in r.prs().
pub fn bcast_append(&mut self) {
let self_id = self.id;
let leader_group_id = self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be queried only when follower replication is enabled?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two conditions that we should use leader replication. One is that follower replication is disabled. The other is that follower replication is enabled but leader's group id is unknown. So I want to reuse the code of iterating progress and sending append.

Copy link
Member

@BusyJay BusyJay Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let mut use_leader_replication = self.is_leader();
let mut leader_group_id = 0;
if !use_leader_replication {
    leader_group_id = query;
    use_leader_replciation = leader_group_id == 0;
}
if use_leader_replication {
    return;
}   

src/raft.rs Outdated
// Messages that needs to be forwarded are stored in hashmap temporarily,
// and they are grouped by broadcast_group_id of progress.
// Messages in msg_group will be pushed to message queue later.
let mut msg_group: HashMap<u64, Vec<Message>> = HashMap::default();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better cache msg_group to avoid allocation.

src/raft.rs Outdated

// Merge messages in the same broadcast group and send them.
for (group_id, mut group) in msg_group.drain() {
// Double check: do not need to forward messages in leader's broadcast group.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's meaningless.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove it.

src/raft.rs Outdated
}
// Attach forward information to MsgGroupbroadcast and send it.
group[mark].set_forwards(forwards.into());
for msg in group {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean self.msgs.push(group[mark])?

src/raft.rs Outdated
// Find an appropriate agent.
// If found an appropriate one, mark the corresponding message's type as
// MsgGroupBroadcast and return true. If not found, return false.
fn select_agent_for_bcast_group(&self, msgs: &mut [Message]) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return the index of the message? Changing the type in this function is quite confusing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a better choice.

src/raft.rs Outdated
let peer_id = msg.to;
let is_voter = self.prs().conf().voters().contains(peer_id);
// Agent must be voter and recently active.
if !is_voter || !self.is_recent_active(peer_id) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These information can be obtained when the message is generated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The information of voter and recent_active is only used for agent selection. When the message is generated, we only determine whether it should be replicated by follower.
If we want to obtain these information when the message is generated, another cache is needed to record them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, what you need to do is merging bcast_append, merge_msg_group and select_agent_for_bcast_group.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge_msg_group and select_agent_for_bcast_group have been merged into bcast_append. PTAL

LintianShi added 3 commits October 20, 2022 15:26
src/raft.rs Outdated
// Do not need to merge if group size is less than two. Or there is no appropriate agent.
if !need_merge {
msgs.append(&mut group.into_iter().map(|(msg, _)| msg).collect());
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continue

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank very much. I think this cause the jitters of the number of sent messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should improve the test to cover this

let mut agent_msg = group.swap_remove(agent_msg_idx.unwrap()).0;
agent_msg.set_msg_type(MessageType::MsgGroupBroadcast);
agent_msg.set_forwards(forwards.into());
msgs.push(agent_msg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to send other msgs for agent?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other messages have been pushed to self.msgs when generated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was thought there would be multiple MsgAppend for one peer

index: msg.index,
..Default::default()
};
forwards.push(forward);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if too many forward info makes one message too large

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that there are often 3 or 5 replicas deployed on 2 or 3 az, I think one message won't take too many forward information.

@@ -888,12 +1002,106 @@ impl<T: Storage> Raft<T> {
/// according to the progress recorded in r.prs().
pub fn bcast_append(&mut self) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follower replication only covers bcast_append, but misses somewhere calling send_append and send_append_aggressively directly

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a cluster replicates data via bcast_append, the cluster is in a stable state with high probability.
If send_append is called, it is usually due to reject of MsgAppendResp, which means that follower's raft log is conflict with leader's. In this situation, we'd better use leader replication to fix follower's raft log as quickly as possible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In previous test, we find that follower replication can reduce a lot of cross-az traffic. So I think we should sacrifice some coverage of follower replication to archive less performance regression. Though we just use follower replication in the main path of log replication, it can still reduce considerable cross-az traffic.
Maybe we can expand follower replication in the future. For example, we can get snapshot from followers.

src/raft.rs Outdated
@@ -260,6 +266,9 @@ pub struct RaftCore<T: Storage> {

/// Max size per committed entries in a `Read`.
pub(crate) max_committed_size_per_ready: u64,

// Message group cache for follower replication.
msg_group: HashMap<u64, Vec<(Message, bool)>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec is more efficient than HashMap.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the number of groups is usually no more than 3, finding element in Vec is more efficient than HashMap. Am I right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not just query, but also update and memory footprint.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Use Vec<(u64, Vec<(Message, bool))> instead of HashMap<u64, Vec<(Message, bool)>>.


/// The current configuration state of the cluster.
#[get = "pub"]
conf: Configuration,
pub(crate) conf: Configuration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the above line states it's public accessible already?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using conf(), there will be a conflict between mutable borrow of self.prs.progress and immutable borrow of self.prs.conf.

src/raft.rs Outdated
});

// Merge messages in the same broadcast group and send them.
for (_, mut group) in core.msg_group.drain() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can merge be done in L1056?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be impossible because the broadcast group is fixed after all progresses are iterated. The merge should start after the broadcast group is fixed.
If we group progress by broadcast_group_id first and then iterate them, it is feasible.

let mut tmp_msgs = Vec::default();
// Let messages be pushed into tmp_vec firstly.
core.send_append(*id, pr, &mut tmp_msgs);
for msg in tmp_msgs {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

if pr.broadcast_group_id == leader_group_id || !pr.is_replicating() {
    msgs.extend(tmp_msgs);
    return;
}
let is_voter = is_voter(pr);
for msg in tmp_msgs {
    if msg.get_msg_type() != MsgAppend {
    } else {
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants