From 5c7106dd6de2de35bc688e8eca5abbe1d693a3a7 Mon Sep 17 00:00:00 2001 From: Jose Bolina Date: Wed, 15 Nov 2023 19:49:53 -0300 Subject: [PATCH] Provide a configurable class loader for `ReplicatedStateMachine` * This provides a method to set a custom class loader to utilize when parsing state machines commands and responses. * Closes #221. --- .../raft/blocks/ReplicatedStateMachine.java | 32 ++++++++++++------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/org/jgroups/raft/blocks/ReplicatedStateMachine.java b/src/org/jgroups/raft/blocks/ReplicatedStateMachine.java index 6d596f6a..1c550917 100644 --- a/src/org/jgroups/raft/blocks/ReplicatedStateMachine.java +++ b/src/org/jgroups/raft/blocks/ReplicatedStateMachine.java @@ -29,6 +29,7 @@ public class ReplicatedStateMachine implements StateMachine { protected long repl_timeout=20000; // timeout (ms) to wait for a majority to ack a write // If true, reads are served locally without going through RAFT. protected boolean allow_dirty_reads=false; + protected ClassLoader class_loader=null; protected final List> listeners=new ArrayList<>(); // Hashmap for the contents @@ -60,6 +61,15 @@ public ReplicatedStateMachine(JChannel ch) { public String raftId() {return raft.raftId();} public ReplicatedStateMachine raftId(String id) {raft.raftId(id); return this;} + public ReplicatedStateMachine useClassLoader(ClassLoader classLoader) { + this.class_loader = classLoader; + return this; + } + + public ClassLoader classLoaderInUse(ClassLoader classLoader) { + return classLoader; + } + public String dumpLog() { StringBuilder sb=new StringBuilder(); @@ -86,16 +96,16 @@ public String dumpLog() { byte type=in.readByte(); switch(type) { case PUT: - K key=Util.objectFromStream(in); - V val=Util.objectFromStream(in); + K key=Util.objectFromStream(in, class_loader); + V val=Util.objectFromStream(in, class_loader); sb.append("put(").append(key).append(", ").append(val).append(")"); break; case REMOVE: - key=Util.objectFromStream(in); + key=Util.objectFromStream(in, class_loader); sb.append("remove(").append(key).append(")"); break; case GET: - key=Util.objectFromStream(in); + key=Util.objectFromStream(in, class_loader); sb.append("get(").append(key).append(")"); break; default: @@ -191,8 +201,8 @@ public int size() { byte command=in.readByte(); switch(command) { case PUT: - K key=Util.objectFromStream(in); - V val=Util.objectFromStream(in); + K key=Util.objectFromStream(in, class_loader); + V val=Util.objectFromStream(in, class_loader); V old_val; synchronized(map) { old_val=map.put(key, val); @@ -200,14 +210,14 @@ public int size() { notifyPut(key, val, old_val); return old_val == null? null : serialize_response? Util.objectToByteBuffer(old_val) : null; case REMOVE: - key=Util.objectFromStream(in); + key=Util.objectFromStream(in, class_loader); synchronized(map) { old_val=map.remove(key); } notifyRemove(key, old_val); return old_val == null? null : serialize_response? Util.objectToByteBuffer(old_val) : null; case GET: - key=Util.objectFromStream(in); + key=Util.objectFromStream(in, class_loader); synchronized(map) { val=map.get(key); } @@ -222,8 +232,8 @@ public int size() { int size=Bits.readIntCompressed(in); Map tmp=new HashMap<>(size); for(int i=0; i < size; i++) { - K key=Util.objectFromStream(in); - V val=Util.objectFromStream(in); + K key=Util.objectFromStream(in, class_loader); + V val=Util.objectFromStream(in, class_loader); tmp.put(key, val); } synchronized(map) { @@ -265,7 +275,7 @@ protected V invoke(byte command, K key, V val, boolean ignore_return_value) thro byte[] buf=out.buffer(); byte[] rsp=raft.set(buf, 0, out.position(), repl_timeout, TimeUnit.MILLISECONDS); - return ignore_return_value? null: (V)Util.objectFromByteBuffer(rsp); + return ignore_return_value || rsp == null ? null: (V)Util.objectFromByteBuffer(rsp, 0, rsp.length, class_loader); } protected void notifyPut(K key, V val, V old_val) {