Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dep fury #1387

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
<grpc.version>1.53.0</grpc.version>
<guava.version>32.0.0-jre</guava.version>
<transmittable.version>2.12.1</transmittable.version>
<fury.version>0.4.1</fury.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -434,6 +435,11 @@
<artifactId>asm</artifactId>
<version>${asm.version}</version>
</dependency>
<dependency>
<groupId>org.furyio</groupId>
<artifactId>fury-core</artifactId>
<version>${fury.version}</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import io.fury.Fury;
import io.fury.ThreadLocalFury;
import io.fury.ThreadSafeFury;
import io.fury.config.Language;
import io.fury.memory.MemoryBuffer;
import io.fury.resolver.AllowListChecker;
Expand All @@ -47,9 +48,9 @@
@Extension(value = "fury2", code = 22)
public class FurySerializer extends AbstractSerializer {

private final ThreadLocalFury fury;
protected final ThreadSafeFury fury;

private final String checkerMode = SofaConfigs.getOrDefault(RpcConfigKeys.SERIALIZE_CHECKER_MODE);
private final String checkerMode = SofaConfigs.getOrDefault(RpcConfigKeys.SERIALIZE_CHECKER_MODE);

public FurySerializer() {
fury = new ThreadLocalFury(classLoader -> {
Expand Down Expand Up @@ -102,8 +103,8 @@ public FurySerializer() {
f.register(SofaRpcException.class);
return f;
});
addSerializer(SofaRequest.class, new SofaRequestFurySerializer(fury));
addSerializer(SofaResponse.class, new SofaResponseFurySerializer(fury));
addCustomSerializer(SofaRequest.class, new SofaRequestFurySerializer(fury));
addCustomSerializer(SofaResponse.class, new SofaResponseFurySerializer(fury));
}

@Override
Expand Down Expand Up @@ -139,7 +140,7 @@ public Object decode(final AbstractByteBuf data, final Class clazz, final Map<St
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
fury.setClassLoader(contextClassLoader);
CustomSerializer customSerializer = getSerializer(clazz);
CustomSerializer customSerializer = getCustomSerializer(clazz);
if (customSerializer != null) {
return customSerializer.decodeObject(data, context);
} else {
Expand Down Expand Up @@ -175,8 +176,4 @@ public void decode(final AbstractByteBuf data, final Object template, final Map<
}
}

public ThreadLocalFury getFury() {
return fury;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import com.alipay.sofa.rpc.codec.CustomSerializer;
import com.alipay.sofa.rpc.common.RemotingConstants;
import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.transport.AbstractByteBuf;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import io.fury.ThreadLocalFury;
import io.fury.ThreadSafeFury;
import io.fury.memory.MemoryBuffer;

import java.util.Map;
Expand All @@ -34,9 +35,9 @@
*/
public class SofaRequestFurySerializer implements CustomSerializer<SofaRequest> {

private final ThreadLocalFury fury;
private final ThreadSafeFury fury;

public SofaRequestFurySerializer(ThreadLocalFury fury) {
public SofaRequestFurySerializer(ThreadSafeFury fury) {
this.fury = fury;
}

Expand All @@ -51,15 +52,15 @@ public AbstractByteBuf encodeObject(SofaRequest object, Map<String, String> cont
isGenericRequest(context.get(RemotingConstants.HEAD_GENERIC_TYPE));
if (genericSerialize) {
// TODO support generic call
throw new SofaRpcException("Generic call is not supported for now.");
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Generic call is not supported for now.");
}
fury.serialize(writeBuffer, object);
final Object[] args = object.getMethodArgs();
fury.serialize(writeBuffer, args);

return new ByteArrayWrapperByteBuf(writeBuffer.getBytes(0, writeBuffer.writerIndex()));
} catch (Exception e) {
throw new SofaRpcException(e.getMessage(), e);
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, e.getMessage(), e);
}
}

Expand All @@ -70,30 +71,30 @@ public SofaRequest decodeObject(AbstractByteBuf data, Map<String, String> contex
SofaRequest sofaRequest = (SofaRequest) fury.deserialize(readBuffer);
String targetServiceName = sofaRequest.getTargetServiceUniqueName();
if (targetServiceName == null) {
throw new SofaRpcException("Target service name of request is null!");
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Target service name of request is null!");
}
String interfaceName = ConfigUniqueNameGenerator.getInterfaceName(targetServiceName);
sofaRequest.setInterfaceName(interfaceName);
final Object[] args = (Object[]) fury.deserialize(readBuffer);
sofaRequest.setMethodArgs(args);
return sofaRequest;
} catch (Exception e) {
throw new SofaRpcException(e.getMessage(), e);
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, e.getMessage(), e);
}
}

@Override
public void decodeObjectByTemplate(AbstractByteBuf data, Map<String, String> context, SofaRequest template)
throws SofaRpcException {
if (data.readableBytes() <= 0) {
throw new SofaRpcException("Deserialized array is empty.");
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Deserialized array is empty.");
}
try {
MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array());
SofaRequest tmp = (SofaRequest) fury.deserialize(readBuffer);
String targetServiceName = tmp.getTargetServiceUniqueName();
if (targetServiceName == null) {
throw new SofaRpcException("Target service name of request is null!");
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, "Target service name of request is null!");
}
// copy values to template
template.setMethodName(tmp.getMethodName());
Expand All @@ -106,7 +107,7 @@ public void decodeObjectByTemplate(AbstractByteBuf data, Map<String, String> con
final Object[] args = (Object[]) fury.deserialize(readBuffer);
template.setMethodArgs(args);
} catch (Exception e) {
throw new SofaRpcException(e.getMessage(), e);
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

import com.alipay.sofa.rpc.codec.CustomSerializer;
import com.alipay.sofa.rpc.common.RemotingConstants;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.transport.AbstractByteBuf;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import io.fury.ThreadLocalFury;
import io.fury.ThreadSafeFury;
import io.fury.memory.MemoryBuffer;

import java.util.Map;
Expand All @@ -33,57 +34,54 @@
*/
public class SofaResponseFurySerializer implements CustomSerializer<SofaResponse> {

private final ThreadLocalFury fury;
private final ThreadSafeFury fury;

public SofaResponseFurySerializer(ThreadLocalFury fury) {
public SofaResponseFurySerializer(ThreadSafeFury fury) {
this.fury = fury;
}

@Override
public AbstractByteBuf encodeObject(SofaResponse object, Map<String, String> context) throws SofaRpcException {
try {
fury.setClassLoader(Thread.currentThread().getContextClassLoader());
MemoryBuffer writeBuffer = MemoryBuffer.newHeapBuffer(32);
writeBuffer.writerIndex(0);
fury.serialize(writeBuffer, object);
return new ByteArrayWrapperByteBuf(writeBuffer.getBytes(0, writeBuffer.writerIndex()));
} catch (Exception e) {
throw new SofaRpcException(e.getMessage(), e);
throw new SofaRpcException(RpcErrorType.SERVER_DESERIALIZE, e.getMessage(), e);
}
}

@Override
public SofaResponse decodeObject(AbstractByteBuf data, Map<String, String> context) throws SofaRpcException {
MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array());
try {
fury.setClassLoader(Thread.currentThread().getContextClassLoader());
boolean genericSerialize = context != null && isGenericResponse(
context.get(RemotingConstants.HEAD_GENERIC_TYPE));
if (genericSerialize) {
// TODO support generic call
throw new SofaRpcException("Generic call is not supported for now.");
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Generic call is not supported for now.");
}
return (SofaResponse) fury.deserialize(readBuffer);
} catch (Exception e) {
throw new SofaRpcException(e.getMessage(), e);
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, e.getMessage(), e);
}
}

@Override
public void decodeObjectByTemplate(AbstractByteBuf data, Map<String, String> context, SofaResponse template)
throws SofaRpcException {
if (data.readableBytes() <= 0) {
throw new SofaRpcException("Deserialized array is empty.");
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Deserialized array is empty.");
}
try {
fury.setClassLoader(Thread.currentThread().getContextClassLoader());
MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array());
// 根据SerializeType信息决定序列化器
boolean genericSerialize = context != null && isGenericResponse(
context.get(RemotingConstants.HEAD_GENERIC_TYPE));
if (genericSerialize) {
// TODO support generic call
throw new SofaRpcException("Generic call is not supported for now.");
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Generic call is not supported for now.");
} else {
SofaResponse tmp = (SofaResponse) fury.deserialize(readBuffer);
// copy values to template
Expand All @@ -92,7 +90,7 @@ public void decodeObjectByTemplate(AbstractByteBuf data, Map<String, String> con
template.setResponseProps(tmp.getResponseProps());
}
} catch (Exception e) {
throw new SofaRpcException(e.getMessage(), e);
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@ public static void clear() {
genericServiceMap.clear();
}

protected CustomSerializer getObjCustomSerializer(Object obj) {
protected CustomSerializer getObjCustomSerializer(Object obj) {
if (obj == null) {
return null;
}
return getSerializer(obj.getClass());
return getCustomSerializer(obj.getClass());
}

protected CustomSerializer getSerializer(Class clazz) {
protected CustomSerializer getCustomSerializer(Class clazz) {
return customSerializers.get(clazz);
}

protected void addSerializer(Class clazz, CustomSerializer serializerManager) {
public void addCustomSerializer(Class clazz, CustomSerializer serializerManager) {
customSerializers.put(clazz, serializerManager);
}
}
Loading