diff --git a/all/pom.xml b/all/pom.xml index 880e138ff..c4ce304bb 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -73,6 +73,7 @@ 1.53.0 32.0.0-jre 2.12.1 + 0.4.1 @@ -434,6 +435,11 @@ asm ${asm.version} + + org.furyio + fury-core + ${fury.version} + diff --git a/codec/codec-sofa-fury/src/main/java/com/alipay/sofa/rpc/codec/fury/FurySerializer.java b/codec/codec-sofa-fury/src/main/java/com/alipay/sofa/rpc/codec/fury/FurySerializer.java index e1a61a6fd..3f8980623 100644 --- a/codec/codec-sofa-fury/src/main/java/com/alipay/sofa/rpc/codec/fury/FurySerializer.java +++ b/codec/codec-sofa-fury/src/main/java/com/alipay/sofa/rpc/codec/fury/FurySerializer.java @@ -32,6 +32,7 @@ import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf; import io.fury.Fury; import io.fury.ThreadLocalFury; +import io.fury.ThreadSafeFury; import io.fury.config.Language; import io.fury.memory.MemoryBuffer; import io.fury.resolver.AllowListChecker; @@ -47,9 +48,9 @@ @Extension(value = "fury2", code = 22) public class FurySerializer extends AbstractSerializer { - private final ThreadLocalFury fury; + protected final ThreadSafeFury fury; - private final String checkerMode = SofaConfigs.getOrDefault(RpcConfigKeys.SERIALIZE_CHECKER_MODE); + private final String checkerMode = SofaConfigs.getOrDefault(RpcConfigKeys.SERIALIZE_CHECKER_MODE); public FurySerializer() { fury = new ThreadLocalFury(classLoader -> { @@ -102,8 +103,8 @@ public FurySerializer() { f.register(SofaRpcException.class); return f; }); - addSerializer(SofaRequest.class, new SofaRequestFurySerializer(fury)); - addSerializer(SofaResponse.class, new SofaResponseFurySerializer(fury)); + addCustomSerializer(SofaRequest.class, new SofaRequestFurySerializer(fury)); + addCustomSerializer(SofaResponse.class, new SofaResponseFurySerializer(fury)); } @Override @@ -139,7 +140,7 @@ public Object decode(final AbstractByteBuf data, final Class clazz, final Map { - private final ThreadLocalFury fury; + private final ThreadSafeFury fury; - public SofaRequestFurySerializer(ThreadLocalFury fury) { + public SofaRequestFurySerializer(ThreadSafeFury fury) { this.fury = fury; } @@ -51,7 +52,7 @@ public AbstractByteBuf encodeObject(SofaRequest object, Map cont isGenericRequest(context.get(RemotingConstants.HEAD_GENERIC_TYPE)); if (genericSerialize) { // TODO support generic call - throw new SofaRpcException("Generic call is not supported for now."); + throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Generic call is not supported for now."); } fury.serialize(writeBuffer, object); final Object[] args = object.getMethodArgs(); @@ -59,7 +60,7 @@ public AbstractByteBuf encodeObject(SofaRequest object, Map cont return new ByteArrayWrapperByteBuf(writeBuffer.getBytes(0, writeBuffer.writerIndex())); } catch (Exception e) { - throw new SofaRpcException(e.getMessage(), e); + throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, e.getMessage(), e); } } @@ -70,7 +71,7 @@ public SofaRequest decodeObject(AbstractByteBuf data, Map contex SofaRequest sofaRequest = (SofaRequest) fury.deserialize(readBuffer); String targetServiceName = sofaRequest.getTargetServiceUniqueName(); if (targetServiceName == null) { - throw new SofaRpcException("Target service name of request is null!"); + throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Target service name of request is null!"); } String interfaceName = ConfigUniqueNameGenerator.getInterfaceName(targetServiceName); sofaRequest.setInterfaceName(interfaceName); @@ -78,7 +79,7 @@ public SofaRequest decodeObject(AbstractByteBuf data, Map contex sofaRequest.setMethodArgs(args); return sofaRequest; } catch (Exception e) { - throw new SofaRpcException(e.getMessage(), e); + throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, e.getMessage(), e); } } @@ -86,14 +87,14 @@ public SofaRequest decodeObject(AbstractByteBuf data, Map contex public void decodeObjectByTemplate(AbstractByteBuf data, Map context, SofaRequest template) throws SofaRpcException { if (data.readableBytes() <= 0) { - throw new SofaRpcException("Deserialized array is empty."); + throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Deserialized array is empty."); } try { MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array()); SofaRequest tmp = (SofaRequest) fury.deserialize(readBuffer); String targetServiceName = tmp.getTargetServiceUniqueName(); if (targetServiceName == null) { - throw new SofaRpcException("Target service name of request is null!"); + throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Target service name of request is null!"); } // copy values to template template.setMethodName(tmp.getMethodName()); @@ -106,7 +107,7 @@ public void decodeObjectByTemplate(AbstractByteBuf data, Map con final Object[] args = (Object[]) fury.deserialize(readBuffer); template.setMethodArgs(args); } catch (Exception e) { - throw new SofaRpcException(e.getMessage(), e); + throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, e.getMessage(), e); } } diff --git a/codec/codec-sofa-fury/src/main/java/com/alipay/sofa/rpc/codec/fury/serialize/SofaResponseFurySerializer.java b/codec/codec-sofa-fury/src/main/java/com/alipay/sofa/rpc/codec/fury/serialize/SofaResponseFurySerializer.java index 9a7402ab1..5983be113 100644 --- a/codec/codec-sofa-fury/src/main/java/com/alipay/sofa/rpc/codec/fury/serialize/SofaResponseFurySerializer.java +++ b/codec/codec-sofa-fury/src/main/java/com/alipay/sofa/rpc/codec/fury/serialize/SofaResponseFurySerializer.java @@ -18,11 +18,12 @@ import com.alipay.sofa.rpc.codec.CustomSerializer; import com.alipay.sofa.rpc.common.RemotingConstants; +import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.response.SofaResponse; import com.alipay.sofa.rpc.transport.AbstractByteBuf; import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf; -import io.fury.ThreadLocalFury; +import io.fury.ThreadSafeFury; import io.fury.memory.MemoryBuffer; import java.util.Map; @@ -33,22 +34,21 @@ */ public class SofaResponseFurySerializer implements CustomSerializer { - private final ThreadLocalFury fury; + private final ThreadSafeFury fury; - public SofaResponseFurySerializer(ThreadLocalFury fury) { + public SofaResponseFurySerializer(ThreadSafeFury fury) { this.fury = fury; } @Override public AbstractByteBuf encodeObject(SofaResponse object, Map context) throws SofaRpcException { try { - fury.setClassLoader(Thread.currentThread().getContextClassLoader()); MemoryBuffer writeBuffer = MemoryBuffer.newHeapBuffer(32); writeBuffer.writerIndex(0); fury.serialize(writeBuffer, object); return new ByteArrayWrapperByteBuf(writeBuffer.getBytes(0, writeBuffer.writerIndex())); } catch (Exception e) { - throw new SofaRpcException(e.getMessage(), e); + throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, e.getMessage(), e); } } @@ -56,16 +56,15 @@ public AbstractByteBuf encodeObject(SofaResponse object, Map con public SofaResponse decodeObject(AbstractByteBuf data, Map context) throws SofaRpcException { MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array()); try { - fury.setClassLoader(Thread.currentThread().getContextClassLoader()); boolean genericSerialize = context != null && isGenericResponse( context.get(RemotingConstants.HEAD_GENERIC_TYPE)); if (genericSerialize) { // TODO support generic call - throw new SofaRpcException("Generic call is not supported for now."); + throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Generic call is not supported for now."); } return (SofaResponse) fury.deserialize(readBuffer); } catch (Exception e) { - throw new SofaRpcException(e.getMessage(), e); + throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, e.getMessage(), e); } } @@ -73,17 +72,16 @@ public SofaResponse decodeObject(AbstractByteBuf data, Map conte public void decodeObjectByTemplate(AbstractByteBuf data, Map context, SofaResponse template) throws SofaRpcException { if (data.readableBytes() <= 0) { - throw new SofaRpcException("Deserialized array is empty."); + throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Deserialized array is empty."); } try { - fury.setClassLoader(Thread.currentThread().getContextClassLoader()); MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array()); // 根据SerializeType信息决定序列化器 boolean genericSerialize = context != null && isGenericResponse( context.get(RemotingConstants.HEAD_GENERIC_TYPE)); if (genericSerialize) { // TODO support generic call - throw new SofaRpcException("Generic call is not supported for now."); + throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Generic call is not supported for now."); } else { SofaResponse tmp = (SofaResponse) fury.deserialize(readBuffer); // copy values to template @@ -92,7 +90,7 @@ public void decodeObjectByTemplate(AbstractByteBuf data, Map con template.setResponseProps(tmp.getResponseProps()); } } catch (Exception e) { - throw new SofaRpcException(e.getMessage(), e); + throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, e.getMessage(), e); } } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/codec/AbstractSerializer.java b/core/api/src/main/java/com/alipay/sofa/rpc/codec/AbstractSerializer.java index efe20c953..3c3a0b378 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/codec/AbstractSerializer.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/codec/AbstractSerializer.java @@ -76,18 +76,18 @@ public static void clear() { genericServiceMap.clear(); } - protected CustomSerializer getObjCustomSerializer(Object obj) { + protected CustomSerializer getObjCustomSerializer(Object obj) { if (obj == null) { return null; } - return getSerializer(obj.getClass()); + return getCustomSerializer(obj.getClass()); } - protected CustomSerializer getSerializer(Class clazz) { + protected CustomSerializer getCustomSerializer(Class clazz) { return customSerializers.get(clazz); } - protected void addSerializer(Class clazz, CustomSerializer serializerManager) { + public void addCustomSerializer(Class clazz, CustomSerializer serializerManager) { customSerializers.put(clazz, serializerManager); } }