Skip to content

Commit

Permalink
Fix message delivery in mock cluster
Browse files Browse the repository at this point in the history
* Change the PartitionedRaftCluster to install the view in reverse
  order, leaving the coordinator for last.
* Enqueue messages while a view is installing to send later after the
  cluster is stable.
  • Loading branch information
jabolina committed Jun 21, 2024
1 parent 6ee535f commit 5c88886
Showing 1 changed file with 41 additions and 26 deletions.
67 changes: 41 additions & 26 deletions src/org/jgroups/raft/testfwk/PartitionedRaftCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import org.jgroups.Message;
import org.jgroups.View;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Manipulate the cluster during tests.
Expand All @@ -22,6 +24,9 @@ public class PartitionedRaftCluster extends MockRaftCluster {
protected final Map<Address, List<Address>> partitions = new ConcurrentHashMap<>();
protected final Map<Address, RaftNode> nodes = new ConcurrentHashMap<>();

private final AtomicBoolean viewChanging = new AtomicBoolean(false);
private final BlockingQueue<Message> pending = new ArrayBlockingQueue<>(16);

@Override
public <T extends MockRaftCluster> T clear() {
nodes.clear();
Expand All @@ -36,46 +41,49 @@ public <T extends MockRaftCluster> T add(Address addr, RaftNode node) {

@Override
public void handleView(View view) {
List<Address> members = view.getMembers();
for (Address member : members) {
partitions.put(member, members);
}
viewChanging.set(true);
try {
List<Address> members = view.getMembers();
for (Address member : members) {
partitions.put(member, members);
}

for (Address member : members) {
RaftNode node = nodes.get(member);
node.handleView(view);
// Update the view in the inverse order.
// The coordinator usually has additional work in our implementation, therefore, we install
// the view in the reverse order to make sure all members have the same view.
for (int i = members.size() - 1; i >= 0; i--) {
Address member = members.get(i);
RaftNode node = nodes.get(member);
node.handleView(view);
}
} finally {
viewChanging.set(false);
sendPending();
}
}

@Override
public void send(Message msg) {
// Enqueue messages during a view change, to make sure everything is sent after the view is installed on all members.
if (viewChanging.get()) {
pending.add(msg);
return;
}

Address dest=msg.dest(), src=msg.src();
boolean block = interceptor != null && interceptor.shouldBlock(msg);

if(dest != null) {
// In case the message blocks, we copy the target.
// This sends a message on the view before the blocking happens.
List<Address> connected = block
? new ArrayList<>(partitions.get(src))
: partitions.get(src);

// Blocks the invoking thread.
if (block) interceptor.blockMessage(msg);
// Blocks the invoking thread.
if (block) interceptor.blockMessage(msg);

if(dest != null) {
List<Address> connected = partitions.get(src);
if (connected.contains(dest)) {
RaftNode node = nodes.get(dest);
send(node, msg);
}
} else {
// In case the message blocks, we copy the target.
// This sends a message on the view before the blocking happens.
Collection<Address> targets = block
? new ArrayList<>(partitions.get(src))
: partitions.get(src);

// Blocks the invoking thread.
if (block) interceptor.blockMessage(msg);

Collection<Address> targets = partitions.get(src);
for (Address a : targets) {
RaftNode node = nodes.get(a);
send(node, msg);
Expand Down Expand Up @@ -103,4 +111,11 @@ private void send(RaftNode node, Message msg) {
if (async) deliverAsync(node, msg);
else node.up(msg);
}

private void sendPending() {
Message msg;
while ((msg = pending.poll()) != null) {
send(msg);
}
}
}

0 comments on commit 5c88886

Please sign in to comment.