Skip to content

Commit

Permalink
modify the way we set custom header in triple protocol (#1138)
Browse files Browse the repository at this point in the history
  • Loading branch information
OrezzerO authored Dec 23, 2021
1 parent ebae921 commit 4a0e099
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ public class ConsumerCustomHeaderFilter extends Filter {

@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
setCustomHeader(request);
return invoker.invoke(request);
}

private void setCustomHeader(SofaRequest sofaRequest) {
RpcInvokeContext context = RpcInvokeContext.getContext();
Map customHeader = context.getCustomHeader();
if (CommonUtils.isNotEmpty(customHeader)) {
sofaRequest.addRequestProps(customHeader);
try {
Map customHeader = context.getCustomHeader();
if (CommonUtils.isNotEmpty(customHeader)) {
request.addRequestProps(customHeader);
}
return invoker.invoke(request);
} finally {
context.clearCustomHeader();
}
context.clearCustomHeader();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -42,22 +43,52 @@ public void testCustomFilter() {
Assert.assertEquals("b", request.getRequestProp("a"));
}

@Test
public void testClearAfterInvoke() {
RpcInvokeContext.getContext().addCustomHeader("a", "b");
ConsumerCustomHeaderFilter filter = new ConsumerCustomHeaderFilter();
SofaRequest request = new SofaRequest();
RecordInvoker invoker = new RecordInvoker(null);
filter.invoke(invoker, request);
Assert.assertTrue(RpcInvokeContext.getContext().getCustomHeader().isEmpty());
Assert.assertEquals("b", invoker.getMyHeader().get("a"));
}

static class EmptyInvoker extends FilterInvoker {

private Map<String, String> metaHolder;

protected EmptyInvoker(AbstractInterfaceConfig config) {
super(config);
}

public Map<String, String> getMetaHolder() {
return metaHolder;
}

private Map<String, String> metaHolder;
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
return null;
}
}

protected EmptyInvoker(AbstractInterfaceConfig config) {
private class RecordInvoker extends FilterInvoker {

private Map<String, String> myHeader;

protected RecordInvoker(AbstractInterfaceConfig config) {
super(config);
}

public Map<String, String> getMyHeader() {
return myHeader;
}

@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
Map<String, String> customHeader = RpcInvokeContext.getContext().getCustomHeader();
myHeader = new HashMap<>(customHeader);
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* @author zhaowang
* @version : MetadataHolder.java, v 0.1 2020年09月09日 4:09 下午 zhaowang Exp $
*/
@Deprecated
public class MetadataHolder {
static final ThreadLocal<Map<String,String>> localContext = new ThreadLocal<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.TracerUtils;
import com.alipay.sofa.rpc.common.MetadataHolder;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.tracer.sofatracer.TracingContextKey;
import io.grpc.Metadata;

import java.util.Map;

import static com.alipay.sofa.rpc.server.triple.TripleHeadKeys.HEAD_KEY_TRAFFIC_TYPE;

/**
Expand All @@ -51,8 +49,7 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So
SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan();
boolean loadTest = TracerUtils.isLoadTest(currentSpan);
if (loadTest) {
Map<String, String> metaHolder = MetadataHolder.getMetaHolder();
metaHolder.put(HEAD_KEY_TRAFFIC_TYPE.name(), PRESSURE);
RpcInvokeContext.getContext().addCustomHeader(HEAD_KEY_TRAFFIC_TYPE.name(), PRESSURE);
}

// provider side ,if in consumer side, metadata == null
Expand All @@ -63,10 +60,6 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So
currentSpan.getSofaTracerSpanContext().setBizBaggageItem(MARK, T);
}
}
try {
return invoker.invoke(request);
} finally {
MetadataHolder.clear();
}
return invoker.invoke(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.alipay.common.tracer.core.context.trace.SofaTraceContext;
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.sofa.rpc.common.MetadataHolder;
import com.alipay.sofa.rpc.common.RemotingConstants;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.TracerCompatibleConstants;
Expand Down Expand Up @@ -142,12 +141,18 @@ public static void beforeSend(SofaRequest sofaRequest, ConsumerConfig consumerCo
}

// set custom headers
Set<Map.Entry<String, String>> entries = MetadataHolder.getMetaHolder().entrySet();
for (Map.Entry<String, String> entry : entries) {
if (StringUtils.isNotBlank(entry.getValue())) {
requestHeader.put(TripleHeadKeys.getKey(entry.getKey()), entry.getValue());
try{
Set<Map.Entry<String, String>> customHeader = RpcInvokeContext.getContext().getCustomHeader().entrySet();
for (Map.Entry<String, String> entry : customHeader) {
if (StringUtils.isNotBlank(entry.getValue())) {
requestHeader.put(TripleHeadKeys.getKey(entry.getKey()), entry.getValue());
}
}
}finally {
RpcInvokeContext.getContext().clearCustomHeader();
}


}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.TracerUtils;
import com.alipay.sofa.rpc.common.MetadataHolder;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
Expand All @@ -33,6 +33,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

import static com.alipay.sofa.rpc.filter.PressureMarkTransformFilter.PRESSURE;
Expand All @@ -55,6 +56,9 @@ public void before() {

@After
public void after() {
invoker.getMetaHolder().clear();
// mock ConsumerCustomHeaderFilter
RpcInvokeContext.getContext().clearCustomHeader();
SofaTraceContextHolder.getSofaTraceContext().clear();
}

Expand Down Expand Up @@ -124,7 +128,7 @@ protected EmptyInvoker(AbstractInterfaceConfig config) {

@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
this.metaHolder = MetadataHolder.getMetaHolder();
this.metaHolder = new HashMap<>(RpcInvokeContext.getContext().getCustomHeader());
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public interface TripleHessianInterface {

Response call2(Request request);

boolean testPressureMark(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.alipay.sofa.rpc.triple;

import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.TracerUtils;

/**
* @author zhaowang
* @version : TripleHessianInterfaceImpl.java, v 0.1 2020年06月11日 11:29 上午 zhaowang Exp $
Expand Down Expand Up @@ -53,4 +57,10 @@ public Response call2(Request request) {
public String getFlag() {
return flag;
}

@Override
public boolean testPressureMark(String name) {
SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan();
return TracerUtils.isLoadTest(currentSpan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package com.alipay.sofa.rpc.triple;

import com.alipay.common.tracer.core.SofaTracer;
import com.alipay.common.tracer.core.holder.SofaTraceContextHolder;
import com.alipay.common.tracer.core.span.SofaTracerSpan;
import com.alipay.common.tracer.core.utils.TracerUtils;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
Expand All @@ -27,15 +31,25 @@
import com.alipay.sofa.rpc.log.LoggerFactory;
import org.apache.commons.lang.math.RandomUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.Map;

/**
* @author zhaowang
* @version : TripleHessianInvokeTest.java, v 0.1 2020年06月11日 11:16 上午 zhaowang Exp $
*/
public class TripleHessianInvokeTest {

private static final Logger LOGGER = LoggerFactory.getLogger(TripleHessianInvokeTest.class);
private static final Logger LOGGER = LoggerFactory.getLogger(TripleHessianInvokeTest.class);
public static final SofaTracer tracer = new SofaTracer.Builder("TEST").build();

@Before
public void before() {
SofaTracerSpan span = (SofaTracerSpan) tracer.buildSpan("test").start();
SofaTraceContextHolder.getSofaTraceContext().push(span);
}

@Test
public void testInvoke() throws InterruptedException {
Expand Down Expand Up @@ -71,11 +85,21 @@ public void testInvoke() throws InterruptedException {

TripleHessianInterface helloService = consumerConfig.refer();

Thread.sleep(10 * 1000);
LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName());
helloService.call();
Assert.assertEquals("call", ref.getFlag());

// test Pressure Mark
boolean isLoadTest = helloService.testPressureMark("name");
Assert.assertFalse(isLoadTest);

SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan();
Map<String, String> bizBaggage = currentSpan.getSofaTracerSpanContext().getBizBaggage();
bizBaggage.put("mark", "T");
Assert.assertTrue(TracerUtils.isLoadTest(currentSpan));
isLoadTest = helloService.testPressureMark("name");
Assert.assertTrue(isLoadTest);

String s = helloService.call1();
Assert.assertEquals("call1", ref.getFlag());
Assert.assertEquals("call1", s);
Expand Down Expand Up @@ -136,7 +160,6 @@ public void testInvokeWithUniqueId() throws InterruptedException {

TripleHessianInterface helloService = consumerConfig.refer();

Thread.sleep(10 * 1000);
LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName());
helloService.call();
Assert.assertEquals("call", ref.getFlag());
Expand Down Expand Up @@ -167,7 +190,6 @@ public void testInvokeWithUniqueId() throws InterruptedException {

helloService = consumerConfig.refer();

Thread.sleep(10 * 1000);
LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName());
helloService.call();
Assert.assertEquals("call", ref.getFlag());
Expand Down

0 comments on commit 4a0e099

Please sign in to comment.