Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update comments #357

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/progress/progress_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,7 @@ impl ProgressSet {
pub fn quorum_recently_active(&mut self, perspective_of: u64, quorum_fn: QuorumFn) -> bool {
let mut active = HashSet::default();
for (&id, pr) in self.voters_mut() {
if id == perspective_of {
active.insert(id);
continue;
}
if pr.recent_active {
if id == perspective_of || pr.recent_active {
active.insert(id);
}
}
Expand Down
115 changes: 57 additions & 58 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ pub struct Raft<T: Storage> {
/// The queue of read-only requests.
pub read_only: ReadOnly,

/// Ticks since it reached last electionTimeout when it is leader or candidate.
/// Number of ticks since it reached last electionTimeout or received a
/// Ticks since it reached last election_timeout when it is leader or candidate.
/// Number of ticks since it reached last election_timeout or received a
/// valid message from current leader when it is a follower.
pub election_elapsed: usize,

Expand Down Expand Up @@ -547,12 +547,11 @@ impl<T: Storage> Raft<T> {
fn try_batching(&mut self, to: u64, pr: &mut Progress, ents: &mut Vec<Entry>) -> bool {
// if MsgAppend for the receiver already exists, try_batching
// will append the entries to the existing MsgAppend
let mut is_batched = false;
for msg in &mut self.msgs {
if msg.get_msg_type() == MessageType::MsgAppend && msg.to == to {
if !ents.is_empty() {
if !util::is_continuous_ents(msg, ents) {
return is_batched;
return false;
}
let mut batched_entries: Vec<_> = msg.take_entries().into();
batched_entries.append(ents);
Expand All @@ -561,11 +560,10 @@ impl<T: Storage> Raft<T> {
pr.update_state(last_idx);
}
msg.commit = self.raft_log.committed;
is_batched = true;
break;
return true;
}
}
is_batched
false
}

/// Sends an append RPC with new entries (if any) and the current commit index to the given
Expand Down Expand Up @@ -621,7 +619,7 @@ impl<T: Storage> Raft<T> {
true
}

// send_heartbeat sends an empty MsgAppend
// Sends heartbeat to a node.
fn send_heartbeat(&mut self, to: u64, pr: &Progress, ctx: Option<Vec<u8>>) {
// Attach the commit as min(to.matched, self.raft_log.committed).
// When the leader sends out heartbeat message,
Expand Down Expand Up @@ -658,7 +656,7 @@ impl<T: Storage> Raft<T> {
}
}

/// Sends RPC, without entries to all the peers.
/// Sends heartbeat RPC to all the peers.
pub fn bcast_heartbeat(&mut self) {
let ctx = self.read_only.last_pending_request_ctx();
self.bcast_heartbeat_with_ctx(ctx)
Expand Down Expand Up @@ -818,7 +816,7 @@ impl<T: Storage> Raft<T> {
///
/// # Panics
///
/// Panics if a leader already exists.
/// Panics if this raft is already a leader.
pub fn become_candidate(&mut self) {
assert_ne!(
self.state,
Expand All @@ -841,7 +839,7 @@ impl<T: Storage> Raft<T> {
///
/// # Panics
///
/// Panics if a leader already exists.
/// Panics if this raft is already a leader.
pub fn become_pre_candidate(&mut self) {
assert_ne!(
self.state,
Expand Down Expand Up @@ -913,11 +911,11 @@ impl<T: Storage> Raft<T> {

/// Campaign to attempt to become a leader.
///
/// If prevote is enabled, this is handled as well.
/// If prevote is enabled, it is handled as well.
pub fn campaign(&mut self, campaign_type: &[u8]) {
let (vote_msg, term) = if campaign_type == CAMPAIGN_PRE_ELECTION {
self.become_pre_candidate();
// Pre-vote RPCs are sent for next term before we've incremented self.term.
// Pre-vote RPCs are sent for next term before we increase self.term.
(MessageType::MsgRequestPreVote, self.term + 1)
} else {
self.become_candidate();
Expand Down Expand Up @@ -1052,18 +1050,19 @@ impl<T: Storage> Raft<T> {
// that these messages were simply delayed in the network, but this could
// also mean that this node has advanced its term number during a network
// partition, and it is now unable to either win an election or to rejoin
// the majority on the old term. If checkQuorum is false, this will be
// handled by incrementing term numbers in response to MsgVote with a higher
// term, but if checkQuorum is true we may not advance the term on MsgVote and
// must generate other messages to advance the term. The net result of these
// two features is to minimize the disruption caused by nodes that have been
// removed from the cluster's configuration: a removed node will send MsgVotes
// which will be ignored, but it will not receive MsgApp or MsgHeartbeat, so it
// will not create disruptive term increases, by notifying leader of this node's
// activeness.
// The above comments also true for Pre-Vote
// the majority on the old term. If check_quorum is false, this will be
// handled by incrementing term numbers of other nodes in response to
// RequestVote with a higher term. But if check_quorum is true, other
// nodes may not advance terms on RequestVote and must generate other
// messages to advance them. The net result of check_quorum and pre_vote
// is to minimize the disruption caused by nodes that have been removed
// from the cluster's configuration: a removed node will send RequestVotes
// which will be ignored, but it will not receive MsgAppend or
// MsgHeartbeat, so it will not create disruptive term increases, by
// notifying leader of this node's activeness.
// The above comments also hold for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// When a follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
Expand All @@ -1072,9 +1071,14 @@ impl<T: Storage> Raft<T> {
let to_send = new_message(m.from, MessageType::MsgAppendResponse, None);
self.send(to_send);
} else if m.get_msg_type() == MessageType::MsgRequestPreVote {
// Before pre_vote enable, there may be a recieving candidate with higher term,
// but less log. After update to pre_vote, the cluster may deadlock if
// we drop messages with a lower term.
// Unlike normal election, both sender and receiver of MsgReqPreVote{Resp}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the original comment is quite sufficient. The comment in this PR may give a wrong impression that the check is for a rarely happen situation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just for me but it took me a while to figure out what the deadlock is. :)
Surely the example is misleading a verbose. How about I remote that part?

// won't update their terms accordingly. This makes it possible that the
// nodes with more logs stuck at a lower term, and can never win an election.
// Thus the whole cluster may deadlock.
// To solve this problem, we explicitly reject MsgRequestPreVote, to hint
// the candidate to update its term.
// Notice that in a normal election, voters will bump its term in response
// to MsgReqPreVote, and solve the deadlock.
info!(
self.logger,
"{} [log_term: {}, index: {}, vote: {}] rejected {:?} from {} [log_term: {}, index: {}] at term {}",
Expand Down Expand Up @@ -1110,6 +1114,7 @@ impl<T: Storage> Raft<T> {
#[cfg(feature = "failpoints")]
fail_point!("before_step");

// Now m.term == self.term, unless the message is PreVote related.
match m.get_msg_type() {
MessageType::MsgHup => self.hup(false),
MessageType::MsgRequestVote | MessageType::MsgRequestPreVote => {
Expand All @@ -1121,15 +1126,14 @@ impl<T: Storage> Raft<T> {
(m.get_msg_type() == MessageType::MsgRequestPreVote && m.term > self.term);
// ...and we believe the candidate is up to date.
if can_vote && self.raft_log.is_up_to_date(m.index, m.log_term) {
// When responding to Msg{Pre,}Vote messages we include the term
// from the message, not the local term. To see why consider the
// case where a single node was previously partitioned away and
// it's local term is now of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
// When responding to MsgReq{Pre,}Vote, we include the term from the
// message, but not the local term. This is because this node may be
// partitioned away and its local term would fall behind. Since we
// don't update local term for pre-votes, the pre-campaigning
// candidate on the other end will proceed to ignore the response
// message (it ignores all out of date messages).
// For regular votes, the message term will always be the same
// as local term here.
self.log_vote_approve(&m);
let mut to_send =
new_message(m.from, vote_resp_msg_type(m.get_msg_type()), None);
Expand Down Expand Up @@ -1159,6 +1163,7 @@ impl<T: Storage> Raft<T> {
Ok(())
}

// Up for election.
fn hup(&mut self, transfer_leader: bool) {
if self.state == StateRole::Leader {
debug!(
Expand Down Expand Up @@ -1328,7 +1333,7 @@ impl<T: Storage> Raft<T> {
prs: &mut ProgressSet,
ctx: &mut HandleResponseContext,
) {
// Update the node. Drop the value explicitly since we'll check the qourum after.
// Update the node. Drop the value explicitly since we'll check the quorum after.
{
let pr = prs.get_mut(m.from).unwrap();
pr.recent_active = true;
Expand Down Expand Up @@ -1418,8 +1423,8 @@ impl<T: Storage> Raft<T> {
term = self.term,
lead_transferee = lead_transferee;
);
// Transfer leadership should be finished in one electionTimeout
// so reset r.electionElapsed.
// Transfer leadership should be finished in one election_timeout
// so reset r.election_elapsed.
self.election_elapsed = 0;
self.lead_transferee = Some(lead_transferee);
let pr = prs.get_mut(from).unwrap();
Expand Down Expand Up @@ -1531,7 +1536,7 @@ impl<T: Storage> Raft<T> {
fatal!(self.logger, "stepped empty MsgProp");
}
if !self.prs().voter_ids().contains(&self.id) {
// If we are not currently a member of the range (i.e. this node
// If we are not currently a voter (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return Err(Error::ProposalDropped);
Expand Down Expand Up @@ -1578,7 +1583,7 @@ impl<T: Storage> Raft<T> {
let mut self_set = HashSet::default();
self_set.insert(self.id);
if !self.prs().has_quorum(&self_set, self.quorum_fn) {
// thinking: use an interally defined context instead of the user given context.
// thinking: use an internally defined context instead of the user given context.
// We can express this in terms of the term and index instead of
// a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
Expand Down Expand Up @@ -1669,7 +1674,7 @@ impl<T: Storage> Raft<T> {
Ok(())
}

/// Check if it can become leader.
/// Check if this raft can become leader.
fn check_votes(&mut self) -> Option<bool> {
match self.prs().candidacy_status(&self.votes, self.quorum_fn) {
CandidacyStatus::Elected => {
Expand Down Expand Up @@ -1893,23 +1898,19 @@ impl<T: Storage> Raft<T> {
self.send_request_snapshot();
return;
}
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.to = m.from;
if m.index < self.raft_log.committed {
debug!(
self.logger,
"got message with lower index than committed.";
);
let mut to_send = Message::default();
to_send.set_msg_type(MessageType::MsgAppendResponse);
to_send.to = m.from;
to_send.index = self.raft_log.committed;
self.send(to_send);
return;
}

let mut to_send = Message::default();
to_send.to = m.from;
to_send.set_msg_type(MessageType::MsgAppendResponse);

if let Some((_, last_idx)) = self
.raft_log
.maybe_append(m.index, m.log_term, m.commit, &m.entries)
Expand All @@ -1935,7 +1936,7 @@ impl<T: Storage> Raft<T> {
}

// TODO: revoke pub when there is a better way to test.
/// For a message, commit and send out heartbeat.
/// Handles MsgHeartbeat message. Update log commit and send out heartbeat response.
pub fn handle_heartbeat(&mut self, mut m: Message) {
self.raft_log.commit_to(m.commit);
if self.pending_request_snapshot != INVALID_INDEX {
Expand Down Expand Up @@ -2193,7 +2194,7 @@ impl<T: Storage> Raft<T> {
}

// TODO: revoke pub when there is a better way to test.
/// For a given hardstate, load the state into self.
/// Loads a given hardstate into self.
pub fn load_state(&mut self, hs: &HardState) {
if hs.commit < self.raft_log.committed || hs.commit > self.raft_log.last_index() {
fatal!(
Expand All @@ -2209,8 +2210,7 @@ impl<T: Storage> Raft<T> {
self.vote = hs.vote;
}

/// `pass_election_timeout` returns true iff `election_elapsed` is greater
/// than or equal to the randomized election timeout in
/// Returns whether `election_elapsed` has passed the randomized election timeout in
/// [`election_timeout`, 2 * `election_timeout` - 1].
pub fn pass_election_timeout(&self) -> bool {
self.election_elapsed >= self.randomized_election_timeout
Expand All @@ -2231,18 +2231,17 @@ impl<T: Storage> Raft<T> {
self.randomized_election_timeout = timeout;
}

// check_quorum_active returns true if the quorum is active from
// the view of the local raft state machine. Otherwise, it returns
// false.
// Returns whether the quorum is active from the view of the local raft state machine.
// check_quorum_active also resets all recent_active to false.
// check_quorum_active can only called by leader.
// check_quorum_active can only be called by leader.
fn check_quorum_active(&mut self) -> bool {
let self_id = self.id;
let quorum_fn = self.quorum_fn;
self.mut_prs().quorum_recently_active(self_id, quorum_fn)
}

/// Issues a message to timeout immediately.
/// Issues a message to make a node timeout immediately. If the node is promotable,
/// it will start a campaign soon.
pub fn send_timeout_now(&mut self, to: u64) {
let msg = new_message(to, MessageType::MsgTimeoutNow, None);
self.send(msg);
Expand Down
29 changes: 13 additions & 16 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<T: Storage> RaftLog<T> {

/// For a given index, finds the term associated with it.
pub fn term(&self, idx: u64) -> Result<u64> {
// the valid term range is [index of dummy entry, last index]
// the valid index range is [index of dummy entry, last index]
let dummy_idx = self.first_index() - 1;
if idx < dummy_idx || idx > self.last_index() {
return Ok(0u64);
Expand Down Expand Up @@ -149,21 +149,17 @@ impl<T: Storage> RaftLog<T> {
}
}

/// Finds the index of the conflict.
/// Finds index of the conflict entry.
///
/// It returns the first index of conflicting entries between the existing
/// entries and the given entries, if there are any.
/// It returns the index of first conflicting entry between existing log
/// and the given entry list, if any.
///
/// If there are no conflicting entries, and the existing entries contain
/// all the given entries, zero will be returned.
/// When there is no conflict, it will return the index of first new entry.
/// If there's no new entry, zero will be returned.
///
/// If there are no conflicting entries, but the given entries contains new
/// entries, the index of the first new entry will be returned.
/// An entry is considered to be conflicting if it has the same index with
/// existing one but has a different term.
///
/// An entry is considered to be conflicting if it has the same index but
/// a different term.
///
/// The first entry MUST have an index equal to the argument 'from'.
/// The index of the given entries MUST be continuously increasing.
pub fn find_conflict(&self, ents: &[Entry]) -> u64 {
for e in ents {
Expand Down Expand Up @@ -288,6 +284,7 @@ impl<T: Storage> RaftLog<T> {
}

/// Appends a set of entries to the unstable list.
/// Returns the last index after append.
pub fn append(&mut self, ents: &[Entry]) -> u64 {
trace!(
self.unstable.logger,
Expand Down Expand Up @@ -344,7 +341,7 @@ impl<T: Storage> RaftLog<T> {
}
}

/// Determines if the given (lastIndex,term) log is more up-to-date
/// Determines if the given (last_index, term) log is more up-to-date
/// by comparing the index and term of the last entry in the existing logs.
/// If the logs have last entry with different terms, then the log with the
/// later term is more up-to-date. If the logs end with the same term, then
Expand Down Expand Up @@ -374,18 +371,18 @@ impl<T: Storage> RaftLog<T> {
self.next_entries_since(self.applied)
}

/// Returns whether there are entries that can be applied between `since_idx` and the comitted index.
/// Returns whether there are entries that can be applied between `since_idx` and the committed index.
pub fn has_next_entries_since(&self, since_idx: u64) -> bool {
let offset = cmp::max(since_idx + 1, self.first_index());
self.committed + 1 > offset
}

/// Returns whether there are new entries.
/// Returns whether there are unapplied entries.
pub fn has_next_entries(&self) -> bool {
self.has_next_entries_since(self.applied)
}

/// Returns the current snapshot
/// Returns the current snapshot.
pub fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
if let Some(snap) = self.unstable.snapshot.as_ref() {
if snap.get_metadata().index >= request_index {
Expand Down
Loading