Skip to content

Commit

Permalink
Added related tests
Browse files Browse the repository at this point in the history
Signed-off-by: wego1236 <[email protected]>
  • Loading branch information
wego1236 committed Nov 16, 2024
1 parent 7a90f2e commit cdecedc
Showing 1 changed file with 80 additions and 25 deletions.
105 changes: 80 additions & 25 deletions harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::panic::{self, AssertUnwindSafe};
use harness::*;
use protobuf::Message as PbMessage;
use raft::eraftpb::*;
use raft::storage::MemStorage;
use raft::storage::{GetEntriesContext, MemStorage};
use raft::*;
use raft_proto::*;
use slog::Logger;
Expand Down Expand Up @@ -917,7 +917,7 @@ fn test_dueling_candidates() {
// enough log.
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

let tests = [
let tests = vec![
// role, term, committed, applied, last index.
(StateRole::Follower, 2, (1, 0, 1)),
(StateRole::Follower, 2, (1, 0, 1)),
Expand Down Expand Up @@ -968,7 +968,7 @@ fn test_dueling_pre_candidates() {
// With pre-vote, it does not disrupt the leader.
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

let tests = [
let tests = vec![
// role, term, committed, applied, last index.
(1, StateRole::Leader, 1, (1, 0, 1)),
(2, StateRole::Follower, 1, (1, 0, 1)),
Expand Down Expand Up @@ -1242,7 +1242,7 @@ fn test_commit() {
#[test]
fn test_pass_election_timeout() {
let l = default_logger();
let tests = [
let tests = vec![
(5, 0f64, false),
(10, 0.1, true),
(13, 0.4, true),
Expand Down Expand Up @@ -1728,7 +1728,7 @@ fn test_all_server_stepdown() {
(StateRole::Leader, StateRole::Follower, 3, 1, 1),
];

let tmsg_types = [MessageType::MsgRequestVote, MessageType::MsgAppend];
let tmsg_types = vec![MessageType::MsgRequestVote, MessageType::MsgAppend];
let tterm = 3u64;

for (i, (state, wstate, wterm, windex, entries)) in tests.drain(..).enumerate() {
Expand Down Expand Up @@ -3162,7 +3162,7 @@ fn test_add_node() -> Result<()> {
let mut r = new_test_raft(1, vec![1], 10, 1, new_storage(), &l);
r.apply_conf_change(&add_node(2))?;
assert_iter_eq!(o r.prs().conf().voters().ids(),
[1, 2]
vec![1, 2]
);

Ok(())
Expand Down Expand Up @@ -3208,11 +3208,11 @@ fn test_remove_node() -> Result<()> {
let l = default_logger();
let mut r = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l);
r.apply_conf_change(&remove_node(2))?;
assert_iter_eq!(o r.prs().conf().voters().ids(), [1]);
assert_iter_eq!(o r.prs().conf().voters().ids(), vec![1]);

// Removing all voters is not allowed.
assert!(r.apply_conf_change(&remove_node(1)).is_err());
assert_iter_eq!(o r.prs().conf().voters().ids(), [1]);
assert_iter_eq!(o r.prs().conf().voters().ids(), vec![1]);

Ok(())
}
Expand All @@ -3223,8 +3223,8 @@ fn test_remove_node_itself() {
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l);

assert!(n1.apply_conf_change(&remove_node(1)).is_err());
assert_iter_eq!(n1.prs().conf().learners(), [2]);
assert_iter_eq!(o n1.prs().conf().voters().ids(), [1]);
assert_iter_eq!(n1.prs().conf().learners(), vec![2]);
assert_iter_eq!(o n1.prs().conf().voters().ids(), vec![1]);
}

#[test]
Expand Down Expand Up @@ -3956,8 +3956,8 @@ fn test_restore_with_learner() {
assert!(sm.restore(s.clone()));
assert_eq!(sm.raft_log.last_index(), 11);
assert_eq!(sm.raft_log.term(11).unwrap(), 11);
assert_iter_eq!(o sm.prs().conf().voters().ids(), [1, 2]);
assert_iter_eq!(sm.prs().conf().learners(), [3]);
assert_iter_eq!(o sm.prs().conf().voters().ids(), vec![1, 2]);
assert_iter_eq!(sm.prs().conf().learners(), vec![3]);

let conf_state = s.get_metadata().get_conf_state();
for node in &conf_state.voters {
Expand Down Expand Up @@ -3990,7 +3990,7 @@ fn test_restore_with_voters_outgoing() {
);
assert_iter_eq!(
o sm.prs().conf().voters().ids(),
[1, 2, 3, 4]
vec![1, 2, 3, 4]
);
assert!(!sm.restore(s));
}
Expand Down Expand Up @@ -4078,7 +4078,7 @@ fn test_add_learner() -> Result<()> {
let mut n1 = new_test_raft(1, vec![1], 10, 1, new_storage(), &l);
n1.apply_conf_change(&add_learner(2))?;

assert_iter_eq!(n1.prs().conf().learners(), [2]);
assert_iter_eq!(n1.prs().conf().learners(), vec![2]);
assert!(n1.prs().conf().learners().contains(&2));

Ok(())
Expand All @@ -4091,12 +4091,12 @@ fn test_remove_learner() -> Result<()> {
let l = default_logger();
let mut n1 = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage(), &l);
n1.apply_conf_change(&remove_node(2))?;
assert_iter_eq!(o n1.prs().conf().voters().ids(), [1]);
assert_iter_eq!(o n1.prs().conf().voters().ids(), vec![1]);
assert!(n1.prs().conf().learners().is_empty());

// Remove all voters are not allowed.
assert!(n1.apply_conf_change(&remove_node(1)).is_err());
assert_iter_eq!(o n1.prs().conf().voters().ids(), [1]);
assert_iter_eq!(o n1.prs().conf().voters().ids(), vec![1]);
assert!(n1.prs().conf().learners().is_empty());

Ok(())
Expand Down Expand Up @@ -5286,7 +5286,7 @@ fn test_group_commit_consistent() {
/// of the election with both priority and log.
#[test]
fn test_election_with_priority_log() {
let tests = [
let tests = vec![
// log is up to date or not 1..3, priority 1..3, id, state
(true, false, false, 3, 1, 1, 1, StateRole::Leader),
(true, false, false, 2, 2, 2, 1, StateRole::Leader),
Expand All @@ -5301,7 +5301,7 @@ fn test_election_with_priority_log() {
(false, false, true, 1, 1, 3, 1, StateRole::Leader),
];

for (l1, l2, l3, p1, p2, p3, id, state) in tests {
for (_i, &(l1, l2, l3, p1, p2, p3, id, state)) in tests.iter().enumerate() {
let l = default_logger();
let mut n1 = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage(), &l);
let mut n2 = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage(), &l);
Expand Down Expand Up @@ -5349,7 +5349,7 @@ fn test_election_after_change_priority() {
// check state
assert_eq!(network.peers[&1].state, StateRole::Follower, "peer 1 state");

let tests = [
let tests = vec![
(1, 1, StateRole::Follower), //id, priority, state
(1, 2, StateRole::Leader),
(1, 3, StateRole::Leader),
Expand Down Expand Up @@ -5427,16 +5427,16 @@ fn test_uncommitted_entries_size_limit() {
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

// should return ok
nt.dispatch([msg.clone()]).unwrap();
nt.dispatch(vec![msg.clone()].to_vec()).unwrap();

// then next proposal should be dropped
let result = nt.dispatch([msg]);
let result = nt.dispatch(vec![msg].to_vec());
assert_eq!(result.unwrap_err(), raft::Error::ProposalDropped);

// but entry with empty size should be accepted
let entry = Entry::default();
let empty_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
nt.dispatch([empty_msg]).unwrap();
nt.dispatch(vec![empty_msg].to_vec()).unwrap();

// after reduce, new proposal should be accepted
let mut entry = Entry::default();
Expand All @@ -5453,18 +5453,18 @@ fn test_uncommitted_entries_size_limit() {
let mut entry = Entry::default();
entry.data = (b"hello world and raft" as &'static [u8]).into();
let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
nt.dispatch([long_msg]).unwrap();
nt.dispatch(vec![long_msg].to_vec()).unwrap();

// but another huge one will be dropped
let mut entry = Entry::default();
entry.data = (b"hello world and raft" as &'static [u8]).into();
let long_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
nt.dispatch([long_msg]).unwrap_err();
nt.dispatch(vec![long_msg].to_vec()).unwrap_err();

// entry with empty size should still be accepted
let entry = Entry::default();
let empty_msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![entry]);
nt.dispatch([empty_msg]).unwrap();
nt.dispatch(vec![empty_msg].to_vec()).unwrap();
}

#[test]
Expand Down Expand Up @@ -5851,3 +5851,58 @@ fn test_switching_check_quorum() {
}
assert_eq!(sm.state, StateRole::Leader);
}

fn expect_one_message(r: &mut Interface) -> Message {
let msgs = r.read_messages();
assert_eq!(msgs.len(), 1, "expect one message");
msgs[0].clone()
}

#[test]
fn test_log_replication_with_reordered_message() {
let l = default_logger();
let mut r1 = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l);
r1.become_candidate();
r1.become_leader();
r1.read_messages();
r1.mut_prs().get_mut(2).unwrap().become_replicate();


let mut r2 = new_test_raft(2, vec![1, 2], 10, 1, new_storage(), &l);

// r1 sends 2 MsgApp messages to r2.
let _ = r1.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r1.send_append(2);
let req1 = expect_one_message(&mut r1);
let _ = r1.append_entry(&mut [new_entry(0, 0, SOME_DATA)]);
r1.send_append(2);
let req2 = expect_one_message(&mut r1);

// r2 receives the second MsgApp first due to reordering.
let _ = r2.step(req2);
let resp2 = expect_one_message(&mut r2);
// r2 rejects req2
assert_eq!(resp2.reject, true);
assert_eq!(resp2.reject_hint, 0);
assert_eq!(resp2.index, 2);

// r2 handles the first MsgApp and responses to r1.
// And r1 updates match index accordingly.
let _ = r2.step(req1);
let m = expect_one_message(&mut r2);
assert_eq!(m.reject, false);
assert_eq!(m.index, 2);
let _ = r1.step(m);
let _ = expect_one_message(&mut r1);
assert_eq!(r1.prs().get(2).unwrap().matched, 2);

// r1 observes a transient network issue to r2, hence transits to probe state.
let _ = r1.step(new_message(2, 1, MessageType::MsgUnreachable, 0));
assert_eq!(r1.prs().get(2).unwrap().state, ProgressState::Probe);

// now r1 receives the delayed resp2.
let _ = r1.step(resp2);
let m = expect_one_message(&mut r1);
// r1 shall re-send MsgApp from match index even if resp2's reject hint is less than matching index.
assert_eq!(r1.prs().get(2).unwrap().matched, m.index)
}

0 comments on commit cdecedc

Please sign in to comment.