Skip to content

Commit

Permalink
Provide a configurable class loader for ReplicatedStateMachine
Browse files Browse the repository at this point in the history
* This provides a method to set a custom class loader to utilize when
  parsing state machines commands and responses.
* Closes #221.
  • Loading branch information
jabolina committed Nov 15, 2023
1 parent 54d8cd3 commit 5c7106d
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions src/org/jgroups/raft/blocks/ReplicatedStateMachine.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class ReplicatedStateMachine<K,V> implements StateMachine {
protected long repl_timeout=20000; // timeout (ms) to wait for a majority to ack a write
// If true, reads are served locally without going through RAFT.
protected boolean allow_dirty_reads=false;
protected ClassLoader class_loader=null;
protected final List<Notification<K,V>> listeners=new ArrayList<>();

// Hashmap for the contents
Expand Down Expand Up @@ -60,6 +61,15 @@ public ReplicatedStateMachine(JChannel ch) {
public String raftId() {return raft.raftId();}
public ReplicatedStateMachine<K,V> raftId(String id) {raft.raftId(id); return this;}

public ReplicatedStateMachine<K, V> useClassLoader(ClassLoader classLoader) {
this.class_loader = classLoader;
return this;
}

public ClassLoader classLoaderInUse(ClassLoader classLoader) {
return classLoader;
}

public String dumpLog() {
StringBuilder sb=new StringBuilder();

Expand All @@ -86,16 +96,16 @@ public String dumpLog() {
byte type=in.readByte();
switch(type) {
case PUT:
K key=Util.objectFromStream(in);
V val=Util.objectFromStream(in);
K key=Util.objectFromStream(in, class_loader);
V val=Util.objectFromStream(in, class_loader);
sb.append("put(").append(key).append(", ").append(val).append(")");
break;
case REMOVE:
key=Util.objectFromStream(in);
key=Util.objectFromStream(in, class_loader);
sb.append("remove(").append(key).append(")");
break;
case GET:
key=Util.objectFromStream(in);
key=Util.objectFromStream(in, class_loader);
sb.append("get(").append(key).append(")");
break;
default:
Expand Down Expand Up @@ -191,23 +201,23 @@ public int size() {
byte command=in.readByte();
switch(command) {
case PUT:
K key=Util.objectFromStream(in);
V val=Util.objectFromStream(in);
K key=Util.objectFromStream(in, class_loader);
V val=Util.objectFromStream(in, class_loader);
V old_val;
synchronized(map) {
old_val=map.put(key, val);
}
notifyPut(key, val, old_val);
return old_val == null? null : serialize_response? Util.objectToByteBuffer(old_val) : null;
case REMOVE:
key=Util.objectFromStream(in);
key=Util.objectFromStream(in, class_loader);
synchronized(map) {
old_val=map.remove(key);
}
notifyRemove(key, old_val);
return old_val == null? null : serialize_response? Util.objectToByteBuffer(old_val) : null;
case GET:
key=Util.objectFromStream(in);
key=Util.objectFromStream(in, class_loader);
synchronized(map) {
val=map.get(key);
}
Expand All @@ -222,8 +232,8 @@ public int size() {
int size=Bits.readIntCompressed(in);
Map<K,V> tmp=new HashMap<>(size);
for(int i=0; i < size; i++) {
K key=Util.objectFromStream(in);
V val=Util.objectFromStream(in);
K key=Util.objectFromStream(in, class_loader);
V val=Util.objectFromStream(in, class_loader);
tmp.put(key, val);
}
synchronized(map) {
Expand Down Expand Up @@ -265,7 +275,7 @@ protected V invoke(byte command, K key, V val, boolean ignore_return_value) thro

byte[] buf=out.buffer();
byte[] rsp=raft.set(buf, 0, out.position(), repl_timeout, TimeUnit.MILLISECONDS);
return ignore_return_value? null: (V)Util.objectFromByteBuffer(rsp);
return ignore_return_value || rsp == null ? null: (V)Util.objectFromByteBuffer(rsp, 0, rsp.length, class_loader);
}

protected void notifyPut(K key, V val, V old_val) {
Expand Down

0 comments on commit 5c7106d

Please sign in to comment.