From 6386f8b70ce83fbdf656780089e4edc6c399f1c6 Mon Sep 17 00:00:00 2001
From: Jose Bolina
Date: Sun, 9 Jun 2024 16:48:48 -0300
Subject: [PATCH] Create clustered replication benchmark
* Create a harness to develop benchmarks based on the CounterPerf;
* Created a replication benchmark to verify the performance for
replication an arbitrary byte array in a cluster.
* Created the script for running the new benchmark.
---
bin/replication-perf.sh | 6 +
.../org/jgroups/perf/CommandLineOptions.java | 94 ++++
tests/benchmark/org/jgroups/perf/Main.java | 48 ++
.../perf/harness/AbstractRaftBenchmark.java | 451 ++++++++++++++++++
.../jgroups/perf/harness/RaftBenchmark.java | 39 ++
.../AsyncReplicationBenchmark.java | 121 +++++
.../perf/replication/ReplicationPerf.java | 129 +++++
.../replication/SyncReplicationBenchmark.java | 159 ++++++
tests/resources/raft-benchmark.xml | 3 +-
9 files changed, 1049 insertions(+), 1 deletion(-)
create mode 100755 bin/replication-perf.sh
create mode 100644 tests/benchmark/org/jgroups/perf/CommandLineOptions.java
create mode 100644 tests/benchmark/org/jgroups/perf/Main.java
create mode 100644 tests/benchmark/org/jgroups/perf/harness/AbstractRaftBenchmark.java
create mode 100644 tests/benchmark/org/jgroups/perf/harness/RaftBenchmark.java
create mode 100644 tests/benchmark/org/jgroups/perf/replication/AsyncReplicationBenchmark.java
create mode 100644 tests/benchmark/org/jgroups/perf/replication/ReplicationPerf.java
create mode 100644 tests/benchmark/org/jgroups/perf/replication/SyncReplicationBenchmark.java
diff --git a/bin/replication-perf.sh b/bin/replication-perf.sh
new file mode 100755
index 0000000..3da7dfd
--- /dev/null
+++ b/bin/replication-perf.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+BASEDIR=$(dirname "$0")
+
+# shellcheck disable=SC2086,SC2048
+"$BASEDIR"/test-run.sh -ea org.jgroups.perf.Main org.jgroups.perf.replication.ReplicationPerf -props raft-benchmark.xml $*
diff --git a/tests/benchmark/org/jgroups/perf/CommandLineOptions.java b/tests/benchmark/org/jgroups/perf/CommandLineOptions.java
new file mode 100644
index 0000000..1ea4ff5
--- /dev/null
+++ b/tests/benchmark/org/jgroups/perf/CommandLineOptions.java
@@ -0,0 +1,94 @@
+package org.jgroups.perf;
+
+/**
+ * Base command line arguments for Raft benchmarks.
+ *
+ * The first argument must be the FQN of the benchmark class. The remaining arguments can be provided in any order:
+ *
+ * -props
: The XML file to configure the protocol stack;
+ * -name
: The raft-id of the current node;
+ * -nohup
: Disable the event loop;
+ * -histogram
: Path to write the HdrHistogram file with the collected metrics for this node.
+ *
+ *
+ */
+public final class CommandLineOptions {
+
+ private final String benchmark;
+ private final String name;
+ private final String props;
+ private final String histogramPath;
+ private final boolean runEventLoop;
+
+ private CommandLineOptions(String benchmark, String name, String props, String histogramPath, boolean runEventLoop) {
+ this.benchmark = benchmark;
+ this.name = name;
+ this.props = props;
+ this.histogramPath = histogramPath;
+ this.runEventLoop = runEventLoop;
+ }
+
+ public String getBenchmark() {
+ return benchmark;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getProps() {
+ return props;
+ }
+
+ public String getHistogramPath() {
+ return histogramPath;
+ }
+
+ public boolean shouldRunEventLoop() {
+ return runEventLoop;
+ }
+
+ public static CommandLineOptions parse(String[] args) {
+ String props = null;
+ String name = null;
+ String histogramPath = null;
+ boolean runEventLoop = true;
+
+ if (args.length == 0)
+ throw new IllegalArgumentException("Arguments not provided");
+
+ // The first position contains the benchmark class to run.
+ String benchmark = args[0];
+
+ for (int i = 1; i < args.length; i++) {
+ switch (args[i]) {
+ case "-props":
+ props = args[++i];
+ break;
+
+ case "-name":
+ name = args[++i];
+ break;
+
+ case "-nohup":
+ runEventLoop = false;
+ break;
+
+ case "-histogram":
+ histogramPath = args[++i];
+ break;
+
+ default:
+ System.out.printf("Unknown option: %s%n", args[i]);
+ help(benchmark);
+ break;
+ }
+ }
+
+ return new CommandLineOptions(benchmark, name, props, histogramPath, runEventLoop);
+ }
+
+ private static void help(String benchmark) {
+ System.out.printf("%s [-props ] [-name ] [-nohup] [-histogram /path/to/write]", benchmark);
+ }
+}
diff --git a/tests/benchmark/org/jgroups/perf/Main.java b/tests/benchmark/org/jgroups/perf/Main.java
new file mode 100644
index 0000000..0ba441e
--- /dev/null
+++ b/tests/benchmark/org/jgroups/perf/Main.java
@@ -0,0 +1,48 @@
+package org.jgroups.perf;
+
+import org.jgroups.perf.harness.AbstractRaftBenchmark;
+import org.jgroups.util.Util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Entry-point class to run Raft benchmarks.
+ *
+ * The command line arguments are parsed and the benchmark class is instantiated. The arguments must be provided in the
+ * correct order. The very first argument is the benchmark class to run, followed by the arguments.
+ *
+ */
+public class Main {
+
+ public static void main(String[] args) throws Throwable {
+ CommandLineOptions cmd = CommandLineOptions.parse(args);
+ AbstractRaftBenchmark benchmark = instantiate(cmd);
+
+ // Initializes the benchmark.
+ // Causes the nodes to retrieve the benchmark configuration from the coordinator.
+ benchmark.init();
+
+ if (cmd.shouldRunEventLoop()) {
+ benchmark.eventLoop();
+ } else {
+ for (;;) Util.sleep(60_000);
+ }
+
+ benchmark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static AbstractRaftBenchmark instantiate(CommandLineOptions cmd)
+ throws InvocationTargetException, InstantiationException, IllegalAccessException, ClassNotFoundException {
+
+ Class extends AbstractRaftBenchmark> clazz = (Class extends AbstractRaftBenchmark>) Class.forName(cmd.getBenchmark());
+ Constructor>[] constructors = clazz.getConstructors();
+
+ if (constructors.length > 1)
+ throw new IllegalStateException("Multiple constructors declared!");
+
+ Constructor extends AbstractRaftBenchmark> c = (Constructor extends AbstractRaftBenchmark>) constructors[0];
+ return c.newInstance(cmd);
+ }
+}
diff --git a/tests/benchmark/org/jgroups/perf/harness/AbstractRaftBenchmark.java b/tests/benchmark/org/jgroups/perf/harness/AbstractRaftBenchmark.java
new file mode 100644
index 0000000..3b263bf
--- /dev/null
+++ b/tests/benchmark/org/jgroups/perf/harness/AbstractRaftBenchmark.java
@@ -0,0 +1,451 @@
+package org.jgroups.perf.harness;
+
+import org.jgroups.Address;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.Receiver;
+import org.jgroups.Version;
+import org.jgroups.View;
+import org.jgroups.annotations.Property;
+import org.jgroups.blocks.MethodCall;
+import org.jgroups.blocks.RequestOptions;
+import org.jgroups.blocks.ResponseMode;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.conf.ClassConfigurator;
+import org.jgroups.perf.CommandLineOptions;
+import org.jgroups.perf.counter.HistogramUtil;
+import org.jgroups.protocols.TP;
+import org.jgroups.protocols.raft.RAFT;
+import org.jgroups.tests.perf.PerfUtil;
+import org.jgroups.util.Bits;
+import org.jgroups.util.DefaultThreadFactory;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+import org.jgroups.util.Streamable;
+import org.jgroups.util.ThreadFactory;
+import org.jgroups.util.Util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.HdrHistogram.AbstractHistogram;
+import org.HdrHistogram.Histogram;
+
+/**
+ * Default harness for a benchmark.
+ *
+ * Bootstrap the benchmark with the default configuration and methods for creating the cluster. Provides the functionalities
+ * for updating configuration, retrieving the current configuration from the coordinator, and starting the benchmark in the cluster.
+ *
+ *
+ * Classes extending the harness have a few methods available to extend the configuration:
+ *
+ * - {@link #syncBenchmark(ThreadFactory)} and {@link #asyncBenchmark(ThreadFactory)}: Create the synchronous and
+ * asynchronous instances for running the benchmark, respectively;
+ * - {@link #extendedEventLoop(int)}: Extend the event loop to parse additional options;
+ * - {@link #extendedEventLoopHeader()}: Extend the event loop options message;
+ * - {@link #clear()}: Clear resources created by the benchmark at exit.
+ *
+ *
+ *
+ * @author José Bolina
+ */
+public abstract class AbstractRaftBenchmark implements Receiver {
+
+ protected static final Field NUM_THREADS, TIME, TIMEOUT, PRINT_INVOKERS, PRINT_DETAILS, BENCHMARK;
+ private static final Method[] METHODS = new Method[4];
+ private static final short START = 0;
+ private static final short GET_CONFIG = 1;
+ private static final short SET = 2;
+ private static final short QUIT_ALL = 3;
+ private static final String CLUSTER_NAME;
+ private static final String BASE_EVENT_LOOP =
+ "[1] Start test [2] View [4] Threads (%d) [6] Time (%s)" +
+ "\n[t] incr timeout (%s) [d] details (%b) [i] print updaters (%b)" +
+ "\n[b] benchmark mode (%s) [v] Version" +
+ "\n%s" +
+ "\n[x] Exit [X] Exit all";
+
+ static {
+ try {
+ METHODS[START] = AbstractRaftBenchmark.class.getMethod("startTest");
+ METHODS[GET_CONFIG] = AbstractRaftBenchmark.class.getMethod("getConfig");
+ METHODS[SET] = AbstractRaftBenchmark.class.getMethod("set", String.class, Object.class);
+ METHODS[QUIT_ALL] = AbstractRaftBenchmark.class.getMethod("quitAll");
+
+ NUM_THREADS = Util.getField(AbstractRaftBenchmark.class, "num_threads", true);
+ TIME = Util.getField(AbstractRaftBenchmark.class, "time", true);
+ TIMEOUT = Util.getField(AbstractRaftBenchmark.class, "timeout", true);
+ PRINT_INVOKERS = Util.getField(AbstractRaftBenchmark.class, "print_updaters", true);
+ PRINT_DETAILS = Util.getField(AbstractRaftBenchmark.class, "print_details", true);
+ BENCHMARK = Util.getField(AbstractRaftBenchmark.class, "benchmark", true);
+
+ PerfUtil.init();
+ ClassConfigurator.addIfAbsent((short) 1050, UpdateResult.class);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ CLUSTER_NAME = MethodHandles.lookup().lookupClass().getSimpleName();
+ }
+
+ protected final JChannel channel;
+ private final String histogramPath;
+ private final ThreadFactory threadFactory;
+ private final RpcDispatcher disp;
+
+ @Property
+ protected int num_threads = 100;
+
+ @Property
+ protected int time = 30; // in seconds
+
+ @Property
+ protected boolean print_updaters;
+
+ @Property
+ protected boolean print_details;
+
+ @Property
+ protected long timeout = 60_000; // ms
+
+ @Property
+ protected String benchmark = "sync";
+
+ private volatile boolean looping = true;
+
+ public AbstractRaftBenchmark(CommandLineOptions cmd) throws Throwable {
+ this.histogramPath = cmd.getHistogramPath();
+ if (histogramPath != null)
+ System.out.printf("Histogram enabled! Storing in '%s'%n", histogramPath);
+
+ this.channel = new JChannel(cmd.getProps()).name(cmd.getName());
+
+ TP transport = channel.getProtocolStack().getTransport();
+ boolean useVirtualThreads = transport.useVirtualThreads();
+
+ threadFactory = new DefaultThreadFactory("replication-updater", false, true)
+ .useVirtualThreads(useVirtualThreads);
+ if (useVirtualThreads && Util.virtualThreadsAvailable())
+ System.out.println("Utilizing virtual threads for benchmark!");
+
+ RAFT raft = channel.getProtocolStack().findProtocol(RAFT.class);
+ raft.raftId(cmd.getName());
+ raft.addRoleListener(role -> System.out.printf("%s: role is '%s'%n", channel.getAddress(), role));
+
+ disp = new RpcDispatcher(channel, this).setReceiver(this).setMethodLookup(id -> METHODS[id]);
+
+ System.out.printf("Connecting benchmark node to cluster: '%s'%n", CLUSTER_NAME);
+ channel.connect(CLUSTER_NAME);
+ }
+
+ public final void init() throws Throwable {
+ if (channel.getView().getMembers().size() < 2)
+ return;
+
+ Address coord = channel.getView().getCoord();
+ PerfUtil.Config config = disp.callRemoteMethod(coord, new MethodCall(GET_CONFIG), new RequestOptions(ResponseMode.GET_ALL, 5_000));
+ if (config != null) {
+ System.out.printf("Fetch config from '%s': %s%n", coord, config);
+ for (Map.Entry entry : config.values().entrySet()) {
+ Field field = Util.getField(getClass(), entry.getKey());
+ Util.setField(field, this, entry.getValue());
+ }
+ } else {
+ System.err.println("Failed to fetch config from " + coord);
+ }
+ }
+
+ @Override
+ public void viewAccepted(View new_view) {
+ System.out.printf("Received view: %s%n", new_view);
+ }
+
+ public final void eventLoop() throws Throwable {
+ while (looping) {
+ String message = String.format(BASE_EVENT_LOOP, num_threads, Util.printTime(time, TimeUnit.MILLISECONDS),
+ Util.printTime(timeout, TimeUnit.MILLISECONDS), print_details, print_updaters, benchmark, extendedEventLoopHeader());
+ int c = Util.keyPress(message);
+
+ switch (c) {
+ case '1':
+ startBenchmark();
+ break;
+ case '2':
+ System.out.printf("\n-- local: %s, view: %s\n", channel.getAddress(), channel.getView());
+ break;
+ case '4':
+ changeFieldAcrossCluster(NUM_THREADS, Util.readIntFromStdin("Number of updater threads: "));
+ break;
+ case '6':
+ changeFieldAcrossCluster(TIME, Util.readIntFromStdin("Time (secs): "));
+ break;
+ case 'd':
+ changeFieldAcrossCluster(PRINT_DETAILS, !print_details);
+ break;
+ case 'i':
+ changeFieldAcrossCluster(PRINT_INVOKERS, !print_updaters);
+ break;
+ case 't':
+ changeFieldAcrossCluster(TIMEOUT, Util.readIntFromStdin("update timeout (ms): "));
+ break;
+ case 'b':
+ changeFieldAcrossCluster(BENCHMARK, Util.readStringFromStdin("benchmark mode: "));
+ break;
+ case 'v':
+ System.out.printf("Version: %s, Java version: %s\n", Version.printVersion(),
+ System.getProperty("java.vm.version", "n/a"));
+ break;
+ case 'x':
+ case -1:
+ looping = false;
+ break;
+ case 'X':
+ try {
+ RequestOptions options = new RequestOptions(ResponseMode.GET_NONE, 0)
+ .flags(Message.Flag.OOB, Message.Flag.DONT_BUNDLE, Message.Flag.NO_FC);
+ disp.callRemoteMethods(null, new MethodCall(QUIT_ALL), options);
+ break;
+ } catch (Throwable t) {
+ System.err.println("Calling quitAll() failed: " + t);
+ }
+ break;
+ default:
+ extendedEventLoop(c);
+ break;
+ }
+ }
+ }
+
+ public final void stop() {
+ Util.close(disp, channel);
+ clear();
+ }
+
+ public final UpdateResult startTest() throws Throwable {
+ System.out.printf("running for %d seconds\n", time);
+
+ try (RaftBenchmark rb = getBenchmark(benchmark)) {
+ long start = System.currentTimeMillis();
+
+ rb.start();
+
+ long interval = (long) ((time * 1000.0) / 10.0);
+ for (int i = 1; i <= 10; i++) {
+ Util.sleep(interval);
+ System.out.printf("%d: %s%n", i, printAverage(start, rb));
+ }
+
+ rb.stop();
+ rb.join();
+
+ long totalTime = System.currentTimeMillis() - start;
+
+ Histogram avgIncrs = rb.getResults(print_updaters, avgMinMax -> print(avgMinMax, print_details));
+ if (print_updaters)
+ System.out.printf("\navg over all updaters: %s%n", print(avgIncrs, print_details));
+
+ System.out.printf("\ndone (in %s ms)%n", totalTime);
+
+ if (histogramPath != null) {
+ String fileName = String.format("histogram_%s_%s.hgrm", channel.getName(), benchmark);
+ Path filePath = Path.of(histogramPath, fileName);
+
+ System.out.printf("Storing histogram to '%s'%n", filePath.toAbsolutePath());
+ try {
+ HistogramUtil.writeTo(avgIncrs, filePath.toFile());
+ } catch (IOException e) {
+ System.err.printf("Failed writing histogram: %s", e);
+ }
+ }
+
+ return new UpdateResult(rb.getTotalUpdates(), totalTime, avgIncrs);
+ }
+ }
+
+ public final void quitAll() {
+ System.out.println("Received quit all; shutting down");
+ stopEventLoop();
+ clear();
+ System.exit(0);
+ }
+
+ public final PerfUtil.Config getConfig() {
+ PerfUtil.Config config = new PerfUtil.Config();
+ Class> clazz = getClass();
+ while (clazz != null) {
+ for (Field field : Util.getAllDeclaredFieldsWithAnnotations(clazz, Property.class)) {
+ if (field.isAnnotationPresent(Property.class)) {
+ config.add(field.getName(), Util.getField(field, this));
+ }
+ }
+ clazz = clazz.getSuperclass();
+ }
+ return config;
+ }
+
+ public final void set(String field_name, Object value) {
+ Field field = Util.getField(this.getClass(), field_name);
+ if (field == null)
+ System.err.println("Field " + field_name + " not found");
+ else {
+ Util.setField(field, this, value);
+ System.out.println(field.getName() + "=" + value);
+ }
+ }
+
+ private void stopEventLoop() {
+ looping = false;
+ Util.close(channel);
+ }
+
+ private RaftBenchmark getBenchmark(String type) {
+ if (type.equals("sync"))
+ return syncBenchmark(threadFactory);
+
+ if (type.equals("async"))
+ return asyncBenchmark(threadFactory);
+
+ throw new IllegalArgumentException(String.format("Benchmark %s not found!", benchmark));
+ }
+
+ /**
+ * Creates a new instance of the {@link RaftBenchmark} with synchronous APIs.
+ *
+ * @param tf: Factory to create threads.
+ * @return A new benchmark instance.
+ */
+ public abstract RaftBenchmark syncBenchmark(ThreadFactory tf);
+
+ /**
+ * Creates a new instance of the {@link RaftBenchmark} with asynchronous APIs.
+ *
+ * @param tf: Factory to create threads.
+ * @return A new benchmark instance.
+ */
+ public abstract RaftBenchmark asyncBenchmark(ThreadFactory tf);
+
+ /**
+ * Expand the event loop with new arguments.
+ *
+ * @param c: Key pressed.
+ * @throws Throwable: If an error occurs while handling the key.
+ */
+ public void extendedEventLoop(int c) throws Throwable { }
+
+ /**
+ * Expand the event loop message.
+ *
+ * @return Additional properties to add to the event loop message.
+ */
+ public String extendedEventLoopHeader() {
+ return "";
+ }
+
+ /**
+ * Clear resources created by the benchmark.
+ *
+ * This method is only invoked when exiting or finishing the benchmark.
+ *
+ */
+ public void clear() { }
+
+ /**
+ * Kicks off the benchmark on all cluster nodes
+ */
+ private void startBenchmark() {
+ RspList responses;
+ try {
+ RequestOptions options = new RequestOptions(ResponseMode.GET_ALL, 0)
+ .flags(Message.Flag.OOB, Message.Flag.DONT_BUNDLE, Message.Flag.NO_FC);
+ responses = disp.callRemoteMethods(null, new MethodCall(START), options);
+ } catch (Throwable t) {
+ System.err.println("starting the benchmark failed: " + t);
+ return;
+ }
+
+ long total_incrs = 0;
+ long total_time = 0;
+ Histogram globalHistogram = null;
+
+ System.out.println("\n======================= Results: ===========================");
+ for (Map.Entry> entry : responses.entrySet()) {
+ Address mbr = entry.getKey();
+ Rsp rsp = entry.getValue();
+ UpdateResult result = rsp.getValue();
+ if (result != null) {
+ total_incrs += result.num_updates;
+ total_time += result.total_time;
+ if (globalHistogram == null)
+ globalHistogram = result.histogram;
+ else
+ globalHistogram.add(result.histogram);
+ }
+ System.out.println(mbr + ": " + result);
+ }
+ double total_reqs_sec = total_incrs / (total_time / 1000.0);
+ System.out.println("\n");
+ System.out.println(Util.bold(String.format("Throughput: %,.2f updates/sec/node\n" +
+ "Time: %s / update\n",
+ total_reqs_sec, print(globalHistogram, print_details))));
+ System.out.println("\n\n");
+ }
+
+ protected final void changeFieldAcrossCluster(Field field, Object value) throws Exception {
+ disp.callRemoteMethods(null, new MethodCall(SET, field.getName(), value), RequestOptions.SYNC());
+ }
+
+ private static String printAverage(long start_time, RaftBenchmark benchmark) {
+ long tmp_time = System.currentTimeMillis() - start_time;
+ long incrs = benchmark.getTotalUpdates();
+ double incrs_sec = incrs / (tmp_time / 1000.0);
+ return String.format("%,.0f updates/sec (%,d updates)", incrs_sec, incrs);
+ }
+
+ private static String print(AbstractHistogram histogram, boolean details) {
+ double avg = histogram.getMean();
+ return details ? String.format("min/avg/max = %d/%f/%s", histogram.getMinValue(), avg, histogram.getMaxValue()) :
+ String.format("%s", Util.printTime(avg, TimeUnit.NANOSECONDS));
+ }
+
+ public static class UpdateResult implements Streamable {
+ protected long num_updates;
+ protected long total_time; // in ms
+ protected Histogram histogram; // in ns
+
+ public UpdateResult() { }
+
+ public UpdateResult(long num_updates, long total_time, Histogram histogram) {
+ this.num_updates = num_updates;
+ this.total_time = total_time;
+ this.histogram = histogram;
+ }
+
+ @Override
+ public void writeTo(DataOutput out) throws IOException {
+ Bits.writeLongCompressed(num_updates, out);
+ Bits.writeLongCompressed(total_time, out);
+ Util.objectToStream(histogram, out);
+ }
+
+ @Override
+ public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
+ num_updates = Bits.readLongCompressed(in);
+ total_time = Bits.readLongCompressed(in);
+ histogram = Util.objectFromStream(in);
+ }
+
+ public String toString() {
+ double totalReqsPerSec = num_updates / (total_time / 1000.0);
+ return String.format("%,.2f updates/sec (%,d updates, %s / update)", totalReqsPerSec, num_updates,
+ Util.printTime(histogram.getMean(), TimeUnit.NANOSECONDS));
+ }
+ }
+}
diff --git a/tests/benchmark/org/jgroups/perf/harness/RaftBenchmark.java b/tests/benchmark/org/jgroups/perf/harness/RaftBenchmark.java
new file mode 100644
index 0000000..dcedcc8
--- /dev/null
+++ b/tests/benchmark/org/jgroups/perf/harness/RaftBenchmark.java
@@ -0,0 +1,39 @@
+package org.jgroups.perf.harness;
+
+import java.util.function.Function;
+
+import org.HdrHistogram.AbstractHistogram;
+import org.HdrHistogram.Histogram;
+
+public interface RaftBenchmark extends AutoCloseable {
+ /**
+ * Signals the test start.
+ */
+ void start();
+
+ /**
+ * Signals the test end.
+ */
+ void stop();
+
+ /**
+ * Wait until all updaters finish their work.
+ *
+ * @throws InterruptedException If interrupted.
+ */
+ void join() throws InterruptedException;
+
+ /**
+ * @return The total number of "add" operation invoked.
+ */
+ long getTotalUpdates();
+
+ /**
+ * Returns the results of the run.
+ *
+ * @param printUpdaters If supported and if {@code true}, print to {@link System#out} each updater result.
+ * @param timePrinter {@link Function} to use to print each updater {@link AbstractHistogram} result.
+ * @return The {@link Histogram} with the results of all updaters.
+ */
+ Histogram getResults(boolean printUpdaters, Function timePrinter);
+}
diff --git a/tests/benchmark/org/jgroups/perf/replication/AsyncReplicationBenchmark.java b/tests/benchmark/org/jgroups/perf/replication/AsyncReplicationBenchmark.java
new file mode 100644
index 0000000..f0c0278
--- /dev/null
+++ b/tests/benchmark/org/jgroups/perf/replication/AsyncReplicationBenchmark.java
@@ -0,0 +1,121 @@
+package org.jgroups.perf.replication;
+
+import org.jgroups.perf.harness.RaftBenchmark;
+import org.jgroups.perf.counter.HistogramUtil;
+import org.jgroups.raft.Settable;
+import org.jgroups.util.CompletableFutures;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
+
+import org.HdrHistogram.AbstractHistogram;
+import org.HdrHistogram.AtomicHistogram;
+import org.HdrHistogram.Histogram;
+
+/**
+ * A benchmark for asynchronous APIs.
+ *
+ * Run multiple asynchronous requests in parallel. A new request initiates as soon as a request finishes. Utilize the
+ * {@link org.jgroups.raft.RaftHandle#setAsync(byte[], int, int)} API.
+ *
+ *
+ */
+class AsyncReplicationBenchmark implements RaftBenchmark {
+
+ private final int concurrency;
+ private final byte[] payload;
+ private final Settable settable;
+ private final ThreadFactory tf;
+ private final List> requests;
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+ private final LongAdder updates = new LongAdder();
+ private final AtomicHistogram histogram = HistogramUtil.createAtomic();
+
+
+ public AsyncReplicationBenchmark(int concurrency, Settable settable, byte[] payload, ThreadFactory tf) {
+ this.concurrency = concurrency;
+ this.payload = payload;
+ this.settable = settable;
+ this.tf = tf;
+ this.requests = new ArrayList<>(concurrency);
+ }
+
+ @Override
+ public void start() {
+ stop.set(false);
+ for (int i = 0; i < concurrency; i++) {
+ requests.add(perform());
+ }
+ }
+
+ @Override
+ public void stop() {
+ stop.set(true);
+ }
+
+ @Override
+ public void join() throws InterruptedException {
+ for (CompletionStage request : requests) {
+ request.toCompletableFuture().join();
+ }
+ }
+
+ @Override
+ public long getTotalUpdates() {
+ return updates.sum();
+ }
+
+ @Override
+ public Histogram getResults(boolean printUpdaters, Function timePrinter) {
+ return histogram;
+ }
+
+ @Override
+ public void close() throws Exception {
+ stop.set(true);
+ requests.clear();
+ }
+
+ private void updateTime(long timeNanos) {
+ updates.increment();
+ histogram.recordValue(timeNanos);
+ }
+
+ private CompletionStage perform() {
+ if (stop.get())
+ return CompletableFutures.completedNull();
+
+ CompletableFuture cf = new CompletableFuture<>();
+ perform(cf, execute(), System.nanoTime());
+ return cf;
+ }
+
+ private void perform(CompletableFuture cf, CompletionStage> prev, long start) {
+ if (stop.get()) {
+ cf.complete(null);
+ return;
+ }
+
+ prev.whenComplete((ignoreV, ignoreT) -> {
+ long currentTime = System.nanoTime();
+ updateTime(currentTime - start);
+ perform(cf, execute(), System.nanoTime());
+ });
+ }
+
+ private CompletableFuture> execute() {
+ try {
+ return settable.setAsync(payload, 0, payload.length);
+ } catch (Exception e) {
+ if (!stop.get())
+ e.printStackTrace(System.err);
+ }
+ return CompletableFutures.completedNull();
+ }
+}
diff --git a/tests/benchmark/org/jgroups/perf/replication/ReplicationPerf.java b/tests/benchmark/org/jgroups/perf/replication/ReplicationPerf.java
new file mode 100644
index 0000000..16979a5
--- /dev/null
+++ b/tests/benchmark/org/jgroups/perf/replication/ReplicationPerf.java
@@ -0,0 +1,129 @@
+package org.jgroups.perf.replication;
+
+import org.jgroups.annotations.Property;
+import org.jgroups.perf.harness.AbstractRaftBenchmark;
+import org.jgroups.perf.CommandLineOptions;
+import org.jgroups.perf.harness.RaftBenchmark;
+import org.jgroups.protocols.raft.RAFT;
+import org.jgroups.raft.RaftHandle;
+import org.jgroups.raft.StateMachine;
+import org.jgroups.raft.testfwk.RaftTestUtils;
+import org.jgroups.util.ThreadFactory;
+import org.jgroups.util.Util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.lang.reflect.Field;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Test performance of replicating data.
+ *
+ * This benchmark utilizes the base {@link RaftHandle} to verify the replication performance of a configurable-sized
+ * byte array. The test verifies only the replication part, where the state machine does not interpret the bytes.
+ * Since the {@link StateMachine} implementation is application-specific, we don't measure it in our tests.
+ *
+ *
+ * The benchmark accepts configuration for the payload size, whether fsync the log.
+ *
+ *
+ * @author José Bolina
+ */
+public class ReplicationPerf extends AbstractRaftBenchmark {
+
+ private static final Field DATA_SIZE, USE_FSYNC;
+
+ static {
+ try {
+ DATA_SIZE = Util.getField(ReplicationPerf.class, "data_size", true);
+ USE_FSYNC = Util.getField(ReplicationPerf.class, "use_fsync", true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private final RaftHandle raft;
+ private final CounterStateMachine csm;
+
+ @Property
+ protected int data_size = 526;
+
+ @Property
+ protected boolean use_fsync;
+
+ public ReplicationPerf(CommandLineOptions cmd) throws Throwable {
+ super(cmd);
+ this.csm = new CounterStateMachine();
+ this.raft = new RaftHandle(channel, csm);
+ }
+
+ @Override
+ public RaftBenchmark syncBenchmark(ThreadFactory tf) {
+ visitRaftBeforeBenchmark();
+ return new SyncReplicationBenchmark(num_threads, raft, createTestPayload(), tf);
+ }
+
+ @Override
+ public RaftBenchmark asyncBenchmark(ThreadFactory tf) {
+ visitRaftBeforeBenchmark();
+ return new AsyncReplicationBenchmark(num_threads, raft, createTestPayload(), tf);
+ }
+
+ @Override
+ public String extendedEventLoopHeader() {
+ return String.format("[s] Data size (%d bytes) [f] Use fsync (%b)", data_size, use_fsync);
+ }
+
+ @Override
+ public void extendedEventLoop(int c) throws Throwable {
+ switch (c) {
+ case 's':
+ changeFieldAcrossCluster(DATA_SIZE, Util.readIntFromStdin("new data size: "));
+ break;
+
+ case 'f':
+ changeFieldAcrossCluster(USE_FSYNC, !use_fsync);
+ break;
+
+ default:
+ System.out.printf("Unknown option: %c%n", c);
+ break;
+ }
+ }
+
+ @Override
+ public void clear() {
+ try {
+ RaftTestUtils.deleteRaftLog(channel.getProtocolStack().findProtocol(RAFT.class));
+ } catch (Exception e) {
+ System.err.printf("Failed deleting log file: %s", e);
+ }
+ }
+
+ private void visitRaftBeforeBenchmark() {
+ raft.raft().logUseFsync(use_fsync);
+ }
+
+ private byte[] createTestPayload() {
+ byte[] payload = new byte[data_size];
+ ThreadLocalRandom.current().nextBytes(payload);
+ return payload;
+ }
+
+ private static class CounterStateMachine implements StateMachine {
+ private long updates;
+
+
+ @Override
+ public byte[] apply(byte[] data, int offset, int length, boolean serialize_response) throws Exception {
+ updates++;
+ return null;
+ }
+
+ @Override
+ public void readContentFrom(DataInput in) throws Exception { }
+
+ @Override
+ public void writeContentTo(DataOutput out) throws Exception { }
+ }
+}
diff --git a/tests/benchmark/org/jgroups/perf/replication/SyncReplicationBenchmark.java b/tests/benchmark/org/jgroups/perf/replication/SyncReplicationBenchmark.java
new file mode 100644
index 0000000..ad244d7
--- /dev/null
+++ b/tests/benchmark/org/jgroups/perf/replication/SyncReplicationBenchmark.java
@@ -0,0 +1,159 @@
+package org.jgroups.perf.replication;
+
+import org.jgroups.perf.harness.RaftBenchmark;
+import org.jgroups.perf.counter.HistogramUtil;
+import org.jgroups.raft.Settable;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.Function;
+
+import org.HdrHistogram.AbstractHistogram;
+import org.HdrHistogram.Histogram;
+
+/**
+ * Benchmark for synchronous APIs.
+ *
+ * Initialize multiple threads simultaneously and execute the synchronous API of {@link org.jgroups.raft.RaftHandle}
+ * for replicating the payload.
+ *
+ *
+ * @author José Bolina
+ */
+class SyncReplicationBenchmark implements RaftBenchmark {
+
+ private final BenchmarkRunner runner;
+
+ public SyncReplicationBenchmark(int numberThreads, Settable settable, byte[] payload, ThreadFactory tf) {
+ this.runner = new BenchmarkRunner(numberThreads, settable, payload, tf);
+ }
+
+ @Override
+ public void start() {
+ this.runner.start();
+ }
+
+ @Override
+ public void stop() {
+ this.runner.stop();
+ }
+
+ @Override
+ public void join() throws InterruptedException {
+ this.runner.join();
+ }
+
+ @Override
+ public long getTotalUpdates() {
+ return Arrays.stream(runner.updaters)
+ .filter(Objects::nonNull)
+ .mapToLong(BenchmarkUpdater::numUpdates)
+ .sum();
+ }
+
+ @Override
+ public Histogram getResults(boolean printUpdaters, Function timePrinter) {
+ Histogram global = HistogramUtil.create();
+ Arrays.stream(runner.updaters)
+ .filter(Objects::nonNull)
+ .map(updater -> {
+ if (printUpdaters)
+ System.out.printf("updater %s: updates %s%n", updater.thread.getId(), timePrinter.apply(updater.histogram));
+ return updater.histogram;
+ })
+ .forEach(global::add);
+ return global;
+ }
+
+ @Override
+ public void close() throws Exception {
+ stop();
+ Arrays.stream(runner.updaters)
+ .map(updater -> updater.thread)
+ .forEach(Thread::interrupt);
+ }
+
+ private static class BenchmarkRunner {
+ private final CountDownLatch latch;
+ private final BenchmarkUpdater[] updaters;
+
+ public BenchmarkRunner(int numberThreads, Settable settable, byte[] payload, ThreadFactory tf) {
+ this.latch = new CountDownLatch(1);
+ this.updaters = new BenchmarkUpdater[numberThreads];
+ for (int i = 0; i < numberThreads; i++) {
+ updaters[i] = new BenchmarkUpdater(latch, settable, payload, tf);
+ updaters[i].thread.setName("updater-" + i);
+ updaters[i].thread.start();
+ }
+ }
+
+ public void start() {
+ latch.countDown();
+ }
+
+ public void stop() {
+ for (BenchmarkUpdater updater : updaters) {
+ updater.stop();
+ }
+ }
+
+ public void join() throws InterruptedException {
+ for (BenchmarkUpdater updater : updaters) {
+ updater.thread.join();
+ }
+ }
+ }
+
+ private static final class BenchmarkUpdater implements Runnable {
+ private final Histogram histogram = HistogramUtil.create();
+ private final CountDownLatch latch;
+ private final Settable settable;
+ private final byte[] payload;
+ private final Thread thread;
+
+ private long updates;
+
+ private volatile boolean running = true;
+
+ public BenchmarkUpdater(CountDownLatch latch, Settable settable, byte[] payload, ThreadFactory tf) {
+ this.latch = latch;
+ this.settable = settable;
+ this.payload = payload;
+ this.thread = tf.newThread(this);
+ }
+
+ public long numUpdates() {
+ return updates;
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ try {
+ latch.await();
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ System.out.println("Benchmark updater interrupted before starting");
+ return;
+ }
+
+ while (running) {
+ try {
+ long start = System.nanoTime();
+ settable.set(payload, 0, payload.length);
+ long duration = System.nanoTime() - start;
+ histogram.recordValue(duration);
+ updates++;
+ } catch (Exception e) {
+ if (running)
+ e.printStackTrace(System.err);
+ }
+ }
+ }
+ }
+}
diff --git a/tests/resources/raft-benchmark.xml b/tests/resources/raft-benchmark.xml
index c009ded..d2417e0 100644
--- a/tests/resources/raft-benchmark.xml
+++ b/tests/resources/raft-benchmark.xml
@@ -58,6 +58,7 @@
+ log_class="org.jgroups.protocols.raft.FileBasedLog"
+ log_dir="./target/benchmark" />