Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test case for #306 #314

Merged
merged 8 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/org/jgroups/protocols/raft/RAFT.java
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,10 @@ public RAFT setLeaderAndTerm(Address new_leader, long new_term) {
return this;
}

public int trySetLeaderAndTerm(Address newLeader, long newTerm) {
return raft_state.tryAdvanceTermAndLeader(newTerm, newLeader);
}

private void leaderUpdated(Address new_leader) {
if(Objects.equals(local_addr, new_leader)) {
if(!isLeader())
Expand Down
55 changes: 43 additions & 12 deletions src/org/jgroups/protocols/raft/election/BaseElection.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public abstract class BaseElection extends Protocol {

private final Runner voting_thread=new Runner("voting-thread", this::runVotingProcess, null);
private final ResponseCollector<VoteResponse> votes=new ResponseCollector<>();
private volatile boolean stopVoting;

protected volatile View view;

Expand Down Expand Up @@ -187,9 +188,15 @@ private void handleLeaderElected(Message msg, LeaderElected hdr) {
// Otherwise, need to make sure the leader is the current view and there's still a majority.
// The view could change between the leader is decided and the message arrives.
if (v == null || (isLeaderInView(leader, v) && isMajorityAvailable(v, raft))) {
log.trace("%s <- %s: %s", local_addr, msg.src(), hdr);
stopVotingThread(); // only on the coord
raft.setLeaderAndTerm(leader, term); // possibly changes the role
// Tries to install the new leader.
// We only ever stop the voting thread in case the installation goes through.
// This is necessary to handle partitioning cases with coordinator changes in the view.
// See: https://github.com/jgroups-extras/jgroups-raft/issues/306
int res = raft.trySetLeaderAndTerm(leader, term); // possibly changes the role
log.trace("%s <- %s: %s (%d)", local_addr, msg.src(), hdr, res);
if (res >= 0) {
stopVotingThread(); // only on the coord
}
} else {
log.trace("%s <- %s: %s after leader left (%s)", local_addr, msg.src(), hdr, v);
}
Expand Down Expand Up @@ -301,7 +308,20 @@ private boolean isHigher(VoteResponse one, VoteResponse other) {
protected void runVotingProcess() {
// If the thread is interrupted, means the voting thread was already stopped.
// We place this here just as a shortcut to not increase the term in RAFT.
if (Thread.interrupted()) return;
if (Thread.interrupted()) {
stopVotingThreadInternal();
return;
}

// If externally put to stop, verify if is possible to stop.
if (stopVoting) {
// Only stop in case there is no majority or the leader is already null.
// Otherwise, keep running.
if (!isMajorityAvailable() || raft.leader() != null) {
stopVotingThreadInternal();
return;
}
}

View electionView = this.view;
long new_term=raft.createNewTerm();
Expand Down Expand Up @@ -333,21 +353,23 @@ protected void runVotingProcess() {
// We must stop the voting thread and set the leader as null.
if (!isMajorityAvailable()) {
log.trace("%s: majority lost (%s) before elected (%s)", local_addr, view, leader);
stopVotingThread();
stopVotingThreadInternal();
raft.setLeaderAndTerm(null);
return;
}

// At this point, the majority still in place, so we confirm the elected leader is still present in the view.
// If the leader is not in the view anymore, we keep the voting thread running.
if (isLeaderInView(leader, view)) {
stopVotingThread();
stopVotingThreadInternal();
return;
}

if (log.isTraceEnabled())
log.trace("%s: leader (%s) not in view anymore, retrying", local_addr, leader);
}

if (log.isTraceEnabled())
log.trace("%s: leader (%s) not in view anymore, retrying", local_addr, leader);

stopVoting = false;
} else if (log.isTraceEnabled())
log.trace("%s: collected votes from %s in %d ms (majority=%d); starting another voting round",
local_addr, votes.getValidResults(), time, majority);
Expand All @@ -368,6 +390,7 @@ private static boolean isMajorityAvailable(View view, RAFT raft) {
public synchronized BaseElection startVotingThread() {
if(!isVotingThreadRunning()) {
log.debug("%s: starting the voting thread", local_addr);
stopVoting = false;
voting_thread.start();
}
return this;
Expand Down Expand Up @@ -397,10 +420,18 @@ protected void sendVoteResponse(Address dest, long term, long last_log_term, lon

public synchronized BaseElection stopVotingThread() {
if(isVotingThreadRunning()) {
log.debug("%s: stopping the voting thread", local_addr);
voting_thread.stop();
votes.reset();
log.debug("%s: mark the voting thread to stop", local_addr);
stopVoting = true;

// Interrupt voting thread if majority lost.
if (!isMajorityAvailable()) stopVotingThreadInternal();
}
return this;
}

private void stopVotingThreadInternal() {
log.debug("%s: stopping the voting thread", local_addr);
voting_thread.stop();
votes.reset();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testQuorumLostWhileDeterminingLeader(Class<?> ignore) throws Excepti
cluster.handleView(v2);

assertThat(raft(0).leader()).isNull();
assertThat(election(0).isVotingThreadRunning()).isFalse();
assertThat(eventually(() -> !election(0).isVotingThreadRunning(), 5, TimeUnit.SECONDS)).isTrue();

// Let the method return and install A as leader.
checkPoint.trigger("A_DETERMINE_OUT_CONTINUE");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package org.jgroups.tests.election;

import org.jgroups.*;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.raft.ELECTION2;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.election.VoteResponse;
import org.jgroups.stack.Protocol;
import org.jgroups.tests.harness.AbstractRaftTest;
import org.jgroups.tests.harness.BaseRaftElectionTest;
import org.jgroups.util.Util;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.IntStream;

import org.testng.annotations.Test;

import static java.util.Arrays.stream;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.jgroups.raft.testfwk.RaftTestUtils.eventually;
import static org.jgroups.tests.harness.BaseRaftElectionTest.ALL_ELECTION_CLASSES_PROVIDER;
import static org.testng.Assert.assertEquals;

/**
* @author Zhang Yifei
* @see <a href="https://github.com/jgroups-extras/jgroups-raft/issues/306">Issue</a>
*/
@Test(groups = Global.FUNCTIONAL, singleThreaded = true, dataProvider = ALL_ELECTION_CLASSES_PROVIDER)
public class NetworkPartitionChannelTest extends BaseRaftElectionTest.ChannelBased {
private final int[] indexes;
private volatile Semaphore newTerm;
private final AtomicInteger slowVoteResponses = new AtomicInteger();

{
clusterSize = 5;
indexes = IntStream.range(0, clusterSize).toArray();
recreatePerMethod = true;
System.setProperty(AbstractRaftTest.ENABLE_TRACE_CLASSES,
"org.jgroups.protocols.raft.ELECTION,org.jgroups.protocols.raft.ELECTION2");
}

public void electionAfterMerge(Class<?> ignore) throws Exception {
int leader, coord;
for (;;) {
waitUntilLeaderElected(3000, indexes);
Address a = leaderAddress();
leader = index(a);
// Find a node that address less than leader's
// DefaultMembershipPolicy will make it to be next coordinator in new membership
OptionalInt o = stream(indexes).filter(t -> channel(t).address().compareTo(a) < 0).findAny();
if (o.isPresent()) {
coord = o.getAsInt();
break;
}
JChannel c = channel(leader);
c.disconnect();
c.connect(clusterName());
}
assertEquals(coordIndex(leader), leader);
System.out.println("before partition: " + view(leader));

partition(stream(indexes).filter(t -> t != coord).toArray(), new int[] {coord});
assertEquals(coordIndex(leader), leader);
assertEquals(coordIndex(coord), coord);
System.out.println("partition1: " + view(leader));
System.out.println("partition2: " + view(coord));

raft(leader).set("cmd".getBytes(), 0, 3);
for (int i : indexes) {
System.out.println(address(i) + " lastAppended: " + raft(i).lastAppended());
}

// block the new coordinator to advance the term in voting thread
newTerm = new Semaphore(0);

merge(leader, coord);
Util.waitUntilAllChannelsHaveSameView(30_000, 1000, channels());
assertEquals(coordIndex(leader), coord);
assertEquals(coordIndex(coord), coord);
System.out.println("after merge: " + view(coord));

// since the term is not advanced yet, new coordinator has accepted the existing leader, and stopping the
// voting runner, but voting thread is just interrupted, because the voting process almost uninterruptible,
// so it's still running, the term will be advanced anyway, and the VoteRequest will be sent, if the voting
// process goes wrong, e.g. waiting response timeout then it won't retry the voting process since the runner
// has been stopped and the thread has been interrupted.
waitUntilLeaderElected(3000, indexes);
System.out.println(dumpLeaderAndTerms());

// slow down the responses, coordinator won't get majority vote responses after waiting timeout
slowVoteResponses.set(3);

// unblock the voting thread
newTerm.release();
newTerm = null;

waitUntilPreVoteThreadStops(3000, coord);
waitUntilVotingThreadStops(3000, coord);

// ELECTION may be timeout, ELECTION2 always pass.
waitUntilLeaderElected(3000, indexes);
System.out.println(dumpLeaderAndTerms());
}

@Override
protected RAFT newRaftInstance() {
return new RAFT() {
{
id = ClassConfigurator.getProtocolId(RAFT.class);
}

@Override
public long createNewTerm() {
Semaphore s = newTerm;
if (s != null) s.acquireUninterruptibly();
return super.createNewTerm();
}
};
}

@Override
protected Protocol[] baseProtocolStackForNode(String name) throws Exception {
Protocol[] protocols = super.baseProtocolStackForNode(name);
protocols[0] = new SHARED_LOOPBACK() {
@Override
public Object down(Message msg) {
if (!addr().equals(msg.dest())) {
Header h = msg.getHeader((short) 520);
if (h == null) h = msg.getHeader((short) 524);
if (h instanceof VoteResponse && slowVoteResponses.getAndDecrement() > 0) park(1000);
}
return super.down(msg);
}
};
return protocols;
}

private void partition(int[]... partitions) throws TimeoutException {
List<List<JChannel>> parts = stream(partitions).map(t -> stream(t).mapToObj(this::channel).collect(toList()))
.collect(toList());
for (List<JChannel> p : parts) {
var s = parts.stream().filter(t -> t != p).flatMap(t -> t.stream().map(JChannel::address)).collect(toList());
p.forEach(t -> t.stack().getBottomProtocol().up(new org.jgroups.Event(org.jgroups.Event.SUSPECT, s)));
Util.waitUntilAllChannelsHaveSameView(30_000, 1000, p.toArray(JChannel[]::new));
}
}

private void merge(int... coordinators) throws TimeoutException {
List<JChannel> coords = stream(coordinators).mapToObj(this::channel).collect(toList());
Map<Address, View> views = coords.stream().collect(toMap(JChannel::address, JChannel::view));
coords.forEach(t -> t.stack().getBottomProtocol().up(new org.jgroups.Event(org.jgroups.Event.MERGE, views)));
for (JChannel ch : coords) {
GMS gms = ch.stack().findProtocol(GMS.class);
Util.waitUntil(30_000, 1000, () -> !gms.isMergeTaskRunning());
}
}

private View view(int index) {
return channel(index).stack().<GMS>findProtocol(GMS.class).view();
}

private int coordIndex(int index) {
return index(view(index).getCoord());
}

private int index(Address addr) {
return stream(indexes).filter(t -> channel(t).address().equals(addr)).findAny().getAsInt();
}

private void park(int ms) {
long deadline = System.currentTimeMillis() + ms;
do {
LockSupport.parkUntil(deadline);
} while (System.currentTimeMillis() < deadline);
}

void waitUntilPreVoteThreadStops(long timeout, int... indexes) {
ELECTION2[] a = stream(indexes).mapToObj(this::channel).filter(Objects::nonNull).map(this::election)
.filter(t -> t instanceof ELECTION2).toArray(ELECTION2[]::new);
if (a.length == 0) return;
eventually(() -> {
for (ELECTION2 e : a) if (e.isPreVoteThreadRunning()) return false;
return true;
}, timeout, TimeUnit.MILLISECONDS);
}
}
Loading