Skip to content

Commit

Permalink
Abstraction classes for tests and fixes
Browse files Browse the repository at this point in the history
This commits includes all the new test abstractions. There are base
classes for creating tests based on channels, the mock cluster, for
election, and for state machines.

We utilize some of TestNG functionalities to facilitate the
configuration and hooks for also configuring the environment with log
level.

This also updates all the tests (which make sense) to use the new
abstractions. This greatly reduced the code duplication across the files
and simplified the creation of utility methods. Running the tests in a
loop also greatly reduced flaky tests because of timeouts. We have
enhanced multiple check through the tests which were flaky.

We also take the change to update some of the tests utilize the assertj
test methods. All the assertions in the abstract class already utilizes
and the smaller tests were updated already.

This commits also expands a little the test framework with some
utilities methods and documentation. Some fixes we found during this
work include:

* Fix for sending commits immediately, as messages can reorder;
* Fix for membership changes never applying after a failure;
* Fix for completing outstanding requests when the leader steps down.

This commit is part of issue #222.
  • Loading branch information
jabolina committed Jan 1, 2024
1 parent 70cbbb8 commit 9a05186
Show file tree
Hide file tree
Showing 33 changed files with 3,647 additions and 2,081 deletions.
5 changes: 3 additions & 2 deletions ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
xsi:noNamespaceSchemaLocation="http://ant.apache.org/ivy/schemas/ivy.xsd"
xmlns:e="http://ant.apache.org/ivy/extra">

<info organisation="org.jgroups" module="jgroups-raft" revision="1.0.11.Final-SNAPSHOT"/>
<info organisation="org.jgroups" module="jgroups-raft" revision="1.0.13.Final-SNAPSHOT"/>

<publications>
<artifact name="jgroups-raft" type="pom" />
Expand All @@ -13,11 +13,12 @@


<dependencies>
<dependency org="org.jgroups" name="jgroups" rev="5.2.10.Final"/>
<dependency org="org.jgroups" name="jgroups" rev="5.3.0.Final"/>
<dependency org="org.apache.logging.log4j" name="log4j-api" rev="2.17.1"/>
<dependency org="org.apache.logging.log4j" name="log4j-core" rev="2.17.1"/>
<dependency org="org.fusesource.leveldbjni" name="leveldbjni-all" rev="1.8"/>
<dependency org="org.testng" name="testng" rev="6.14.+"/>
<dependency org="org.assertj" name="assertj-core" rev="3.24.2" />
<dependency org="com.beust" name="jcommander" rev="1.+"/>
<dependency org="net.jcip" name="jcip-annotations" rev="1.0"/>
<dependency org="org.openjdk.jmh" name="jmh-core" rev="1.34"/>
Expand Down
14 changes: 13 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>

<!-- nexus-staging-maven-plugin -->
<autoReleaseAfterClose>true</autoReleaseAfterClose>
<nexus.server.id>jboss-releases-repository</nexus.server.id>
<nexus.server.url>https://repository.jboss.org/nexus</nexus.server.url>
<nexus.snapshot.server.id>jboss-snapshots-repository</nexus.snapshot.server.id>
<nexus.snapshot.server.url>https://repository.jboss.org/nexus/content/repositories/snapshots/</nexus.snapshot.server.url>
<testng.version>7.7.0</testng.version>

<!-- Dependencies versions -->
<testng.version>7.7.1</testng.version>
<assertj.version>3.24.2</assertj.version>
<jgroups.version>5.3.0.Final</jgroups.version>
<leveldbjni.version>1.8</leveldbjni.version>
<log4j.version>2.22.0</log4j.version>
Expand All @@ -42,6 +46,7 @@
<jgroups.udp.ip_ttl>5</jgroups.udp.ip_ttl>
<jgroups.useIPv4>true</jgroups.useIPv4>
<jgroups.useIPv6>false</jgroups.useIPv6>
<trace>false</trace>

<jgroups.tests.jvmargs>
-Djgroups.udp.ip_ttl=0
Expand All @@ -52,6 +57,7 @@
-Dlog4j.configurationFile=${conf.dir}/log4j2.xml
-Djava.net.preferIPv4Stack=${jgroups.useIPv4}
-Djava.net.preferIPv6Addresses=${jgroups.useIPv6}
-Dorg.jgrops.test.trace=${trace}
-Xms400M
-Xmx800M
</jgroups.tests.jvmargs>
Expand Down Expand Up @@ -214,7 +220,13 @@
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src</sourceDirectory>
<resources>
Expand Down
1 change: 1 addition & 0 deletions src/org/jgroups/protocols/raft/ELECTION.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ELECTION extends BaseElection {
protected void handleView(View v) {
Majority result=Utils.computeMajority(view, v, raft().majority(), raft.leader());
log.debug("%s: existing view: %s, new view: %s, result: %s", local_addr, this.view, v, result);
System.out.printf("%s: existing view: %s, new view: %s, result: %s%n", local_addr, this.view, v, result);
List<Address> joiners=View.newMembers(this.view, v);
boolean has_new_members=joiners != null && !joiners.isEmpty();
this.view=v;
Expand Down
15 changes: 13 additions & 2 deletions src/org/jgroups/protocols/raft/Leader.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ public void init() {

public void destroy() {
super.destroy();
RequestTable<String> reqTable = raft.request_table;
raft.request_table=null;
raft.commit_table=null;

if (reqTable != null) reqTable.destroy(raft.notCurrentLeader());
}


Expand All @@ -50,9 +53,17 @@ public void handleAppendEntriesResponse(Address sender, long term, AppendResult
switch(result.result) {
case OK:
raft.commit_table.update(sender, result.index(), result.index() + 1, result.commit_index, false);
if(reqtab.add(result.index, sender_raft_id, this.majority)) {
boolean done = reqtab.add(result.index, sender_raft_id, this.majority);
if(done) {
raft.commitLogTo(result.index, true);
if(raft.send_commits_immediately)
}
// Send commits immediately.
// Note that, an entry is committed by a MAJORITY, this means that some of the nodes doesn't know the entry exist yet.
// This way, send the commit messages any time we handle an append response.
if(raft.send_commits_immediately) {
// Done is only true when reaching a majority threshold, we also need to check is committed to resend
// to slower nodes.
if (done || reqtab.isCommitted(result.index))
sendCommitMessageToFollowers();
}
break;
Expand Down
27 changes: 21 additions & 6 deletions src/org/jgroups/protocols/raft/RAFT.java
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, Op
public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, boolean internal, Options options) {
Address leader = leader();
if(leader == null || (local_addr != null && !leader.equals(local_addr)))
throw new IllegalStateException("I'm not the leader (local_addr=" + local_addr + ", leader=" + leader + ")");
throw notCurrentLeader();
if(buf == null)
throw new IllegalArgumentException("buffer must not be null");
CompletableFuture<byte[]> retval=new CompletableFuture<>();
Expand Down Expand Up @@ -686,7 +686,7 @@ protected void handleDownRequest(CompletableFuture<byte[]> f, byte[] buf, int of
boolean internal, Options opts) {
Address leader = leader();
if(leader == null || !Objects.equals(leader,local_addr))
throw new IllegalStateException("I'm not the leader (local_addr=" + local_addr + ", leader=" + leader + ")");
throw notCurrentLeader();

RequestTable<String> reqtab=request_table;

Expand Down Expand Up @@ -797,7 +797,7 @@ else if(r instanceof UpRequest)
}
}

protected void process (List<Request> q) {
protected void process(List<Request> q) {
RequestTable<String> reqtab=request_table;
LogEntries entries=new LogEntries();
long index=last_appended+1;
Expand All @@ -813,6 +813,14 @@ protected void process (List<Request> q) {
}
else if(r instanceof DownRequest) {
DownRequest dr=(DownRequest)r;
// Complete the request exceptionally.
// The request could either be lost in the reqtab reference or fail with an NPE below.
// It would only complete in case a timeout is associated.
if (!isLeader()) {
dr.f.completeExceptionally(notCurrentLeader());
continue;
}

entries.add(new LogEntry(current_term, dr.buf, dr.offset, dr.length, dr.internal));

// Add the request to the client table, so we can return results to clients when done
Expand All @@ -830,7 +838,7 @@ else if(r instanceof DownRequest) {

// handle down requests
if(leader == null || !Objects.equals(leader,local_addr))
throw new IllegalStateException("I'm not the leader (local_addr=" + local_addr + ", leader=" + leader + ")");
throw notCurrentLeader();

// Append to the log
long prev_index=last_appended;
Expand Down Expand Up @@ -860,6 +868,10 @@ else if(r instanceof DownRequest) {
snapshotIfNeeded(length);
}

IllegalStateException notCurrentLeader() {
return new IllegalStateException("I'm not the leader (local_addr=" + local_addr + ", leader=" + leader() + ")");
}

/** Populate with non-committed entries (from log) (https://github.com/belaban/jgroups-raft/issues/31) */
protected void createRequestTable() {
request_table=new RequestTable<>();
Expand Down Expand Up @@ -948,14 +960,17 @@ protected CompletableFuture<byte[]> changeMembers(String name, InternalCommand.T

Address leader = leader();
if(leader == null || !Objects.equals(leader, local_addr))
throw new IllegalStateException("I'm not the leader (local_addr=" + local_addr + ", leader=" + leader + ")");
throw notCurrentLeader();

InternalCommand cmd=new InternalCommand(type, name);
byte[] buf=Util.streamableToByteBuffer(cmd);

// only add/remove one server at a time (https://github.com/belaban/jgroups-raft/issues/175)
return add_server_future=add_server_future
.thenCompose(s -> setAsync(buf, 0, buf.length, true, null));
// Use handle, so we can execute even if the previous execution failed.
.handle((ignore, t) -> setAsync(buf, 0, buf.length, true, null))
// Chain the new setAsync invocation.
.thenCompose(Function.identity());
}

protected void resend(Address target, long index) {
Expand Down
5 changes: 4 additions & 1 deletion src/org/jgroups/protocols/raft/state/RaftState.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ public synchronized void setLeader(Address newLeader) {
this.leader = newLeader;

// We only should invoke the listener when a new leader is set/step down.
if (updated) onLeaderUpdate.accept(this.leader);
if (updated) {
raft.getLog().trace("%s: change leader from %s -> %s", raft.addr(), leader, newLeader);
onLeaderUpdate.accept(this.leader);
}
}

/**
Expand Down
116 changes: 113 additions & 3 deletions src/org/jgroups/raft/testfwk/MockRaftCluster.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.jgroups.raft.testfwk;

import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.View;

Expand All @@ -8,29 +9,138 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Base class for the cluster implementations in the test framework.
*
* <p>
* The cluster abstraction facilitates the creation of a cluster during the tests. This approach avoids the need to
* create actual {@link org.jgroups.JChannel} and complex configurations. This abstraction provides a simplified and
* controllable way to test the cluster, emitting events and sending messages.
* </p>
*
* <p>
* The cluster abstraction works through {@link View} updates. Members are added and removed manually. When a new view
* is received, the members receive the update accordingly and follow the synchronous configuration. The user utilizing
* this abstraction has more control over when a member joins and which messages it receives.
* </p>
*
* Our test suite contains examples of uses of the cluster abstraction.
*
* @since 1.0.12
*/
public abstract class MockRaftCluster {

protected final Executor thread_pool=createThreadPool(1000);
protected boolean async;

/**
* Emit the view update to all cluster members.
* <p>
* How the view is updated might vary from implementation. The basic idea is to iterate over all members and
* invoke each protocol in the stack to handle the view.
* </p>
*
* @param view: The new {@link View} instance to update the member.
*/
public abstract void handleView(View view);

/**
* Send a message in the cluster.
*
* @param msg: Message to send.
*/
public abstract void send(Message msg);

/**
* The number of member in the cluster.
*
* @return The size of the cluster.
*/
public abstract int size();

/**
* Add a new member to the cluster.
*
* <p>
* The new member is associated with the given address. A member is resolved by the address when sending a message.
* Also, note that a view update is necessary to propagate the member addition.
* </p>
*
* @param addr: The new member's address.
* @param node: The member abstraction wrapped in {@link RaftNode}.
* @return The fluent current class.
* @param <T>: The current instance type.
*/
public abstract <T extends MockRaftCluster> T add(Address addr, RaftNode node);

/**
* Remove a member from the cluster.
* <p>
* A view update is necessary to propagate the member removal.
* </p>
*
* @param addr: The address of the member to remove.
* @return The fluent current class.
* @param <T>: The current instance type.
*/
public abstract <T extends MockRaftCluster> T remove(Address addr);

/**
* Remove all members from the cluster.
*
* @return The fluent current class.
* @param <T>: The current instance type.
*/
public abstract <T extends MockRaftCluster> T clear();

/**
* Utility to create a fluent use.
*
* @return The fluent current class.
* @param <T>: The current instance type.
*/
@SuppressWarnings("unchecked")
protected <T extends MockRaftCluster> T self() {
protected final <T extends MockRaftCluster> T self() {
return (T) this;
}

/**
* Check whether the cluster is in asynchronous mode.
*
* @return <code>true</code> if asynchronous, <code>false</code>, otherwise.
*/
public boolean async() {return async;}
public <T extends MockRaftCluster> T async(boolean b) {async=b; return self();}

protected static Executor createThreadPool(long max_idle_ms) {
/**
* Update the cluster mode between synchronous and asynchronous.
*
* @param b: <code>true</code> to run asynchronous, <code>false</code> to run synchronous.
* @return The fluent current class.
* @param <T>: The current instance type.
*/
public <T extends MockRaftCluster> T async(boolean b) {
async=b;
return self();
}

/**
* Create the {@link Executor} to submit tasks during asynchronous mode.
*
* @param max_idle_ms: Executor configuration parameter.
* @return The {@link Executor} instance to utilize in the cluster abstraction.
*/
protected Executor createThreadPool(long max_idle_ms) {
int max_cores=Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(0, max_cores, max_idle_ms, TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
}

/**
* Asynchronously sends a message up the node stack.
*
* @param node: The node to handle the message.
* @param msg: The message to send.
*/
protected void deliverAsync(RaftNode node, Message msg) {
thread_pool.execute(() -> node.up(msg));
}
Expand Down
18 changes: 17 additions & 1 deletion src/org/jgroups/raft/testfwk/PartitionedRaftCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ public class PartitionedRaftCluster extends MockRaftCluster {
protected final Map<Address, List<Address>> partitions = new ConcurrentHashMap<>();
protected final Map<Address, RaftNode> nodes = new ConcurrentHashMap<>();

public PartitionedRaftCluster clear() {nodes.clear(); return this;}
@Override
public PartitionedRaftCluster clear() {
nodes.clear();
return self();
}

@Override
public PartitionedRaftCluster add(Address addr, RaftNode node) {
nodes.put(addr, node);
return this;
Expand Down Expand Up @@ -62,6 +67,17 @@ public void send(Message msg) {
}
}

@Override
public int size() {
return nodes.size();
}

@Override
public PartitionedRaftCluster remove(Address addr) {
nodes.remove(addr);
return self();
}

private void send(RaftNode node, Message msg) {
if (async) deliverAsync(node, msg);
else node.up(msg);
Expand Down
Loading

0 comments on commit 9a05186

Please sign in to comment.