From 4a0e099120203ad112cd2cb763c30c97c28364f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=91=88=E7=86=B9?= Date: Thu, 23 Dec 2021 10:22:42 +0800 Subject: [PATCH] modify the way we set custom header in triple protocol (#1138) --- .../filter/ConsumerCustomHeaderFilter.java | 18 ++++----- .../rpc/filter/CustomHeaderFilterTest.java | 37 +++++++++++++++++-- .../sofa/rpc/common/MetadataHolder.java | 1 + .../filter/PressureMarkTransformFilter.java | 13 ++----- .../sofatracer/TripleTracerAdapter.java | 15 +++++--- .../PressureMarkTransformFilterTest.java | 8 +++- .../rpc/triple/TripleHessianInterface.java | 1 + .../triple/TripleHessianInterfaceImpl.java | 10 +++++ .../rpc/triple/TripleHessianInvokeTest.java | 30 +++++++++++++-- 9 files changed, 100 insertions(+), 33 deletions(-) diff --git a/core-impl/filter/src/main/java/com/alipay/sofa/rpc/filter/ConsumerCustomHeaderFilter.java b/core-impl/filter/src/main/java/com/alipay/sofa/rpc/filter/ConsumerCustomHeaderFilter.java index 43c127272..3212c8682 100644 --- a/core-impl/filter/src/main/java/com/alipay/sofa/rpc/filter/ConsumerCustomHeaderFilter.java +++ b/core-impl/filter/src/main/java/com/alipay/sofa/rpc/filter/ConsumerCustomHeaderFilter.java @@ -38,16 +38,16 @@ public class ConsumerCustomHeaderFilter extends Filter { @Override public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException { - setCustomHeader(request); - return invoker.invoke(request); - } - - private void setCustomHeader(SofaRequest sofaRequest) { RpcInvokeContext context = RpcInvokeContext.getContext(); - Map customHeader = context.getCustomHeader(); - if (CommonUtils.isNotEmpty(customHeader)) { - sofaRequest.addRequestProps(customHeader); + try { + Map customHeader = context.getCustomHeader(); + if (CommonUtils.isNotEmpty(customHeader)) { + request.addRequestProps(customHeader); + } + return invoker.invoke(request); + } finally { + context.clearCustomHeader(); } - context.clearCustomHeader(); + } } \ No newline at end of file diff --git a/core-impl/filter/src/test/java/com/alipay/sofa/rpc/filter/CustomHeaderFilterTest.java b/core-impl/filter/src/test/java/com/alipay/sofa/rpc/filter/CustomHeaderFilterTest.java index e847dc3e9..7784555b9 100644 --- a/core-impl/filter/src/test/java/com/alipay/sofa/rpc/filter/CustomHeaderFilterTest.java +++ b/core-impl/filter/src/test/java/com/alipay/sofa/rpc/filter/CustomHeaderFilterTest.java @@ -24,6 +24,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; import java.util.Map; /** @@ -42,22 +43,52 @@ public void testCustomFilter() { Assert.assertEquals("b", request.getRequestProp("a")); } + @Test + public void testClearAfterInvoke() { + RpcInvokeContext.getContext().addCustomHeader("a", "b"); + ConsumerCustomHeaderFilter filter = new ConsumerCustomHeaderFilter(); + SofaRequest request = new SofaRequest(); + RecordInvoker invoker = new RecordInvoker(null); + filter.invoke(invoker, request); + Assert.assertTrue(RpcInvokeContext.getContext().getCustomHeader().isEmpty()); + Assert.assertEquals("b", invoker.getMyHeader().get("a")); + } + static class EmptyInvoker extends FilterInvoker { + private Map metaHolder; + + protected EmptyInvoker(AbstractInterfaceConfig config) { + super(config); + } + public Map getMetaHolder() { return metaHolder; } - private Map metaHolder; + @Override + public SofaResponse invoke(SofaRequest request) throws SofaRpcException { + return null; + } + } - protected EmptyInvoker(AbstractInterfaceConfig config) { + private class RecordInvoker extends FilterInvoker { + + private Map myHeader; + + protected RecordInvoker(AbstractInterfaceConfig config) { super(config); } + public Map getMyHeader() { + return myHeader; + } + @Override public SofaResponse invoke(SofaRequest request) throws SofaRpcException { + Map customHeader = RpcInvokeContext.getContext().getCustomHeader(); + myHeader = new HashMap<>(customHeader); return null; } } - } \ No newline at end of file diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/common/MetadataHolder.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/common/MetadataHolder.java index b0bd7e124..42350faab 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/common/MetadataHolder.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/common/MetadataHolder.java @@ -24,6 +24,7 @@ * @author zhaowang * @version : MetadataHolder.java, v 0.1 2020年09月09日 4:09 下午 zhaowang Exp $ */ +@Deprecated public class MetadataHolder { static final ThreadLocal> localContext = new ThreadLocal<>(); diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/filter/PressureMarkTransformFilter.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/filter/PressureMarkTransformFilter.java index d680fc974..414a480c5 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/filter/PressureMarkTransformFilter.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/filter/PressureMarkTransformFilter.java @@ -19,7 +19,7 @@ import com.alipay.common.tracer.core.holder.SofaTraceContextHolder; import com.alipay.common.tracer.core.span.SofaTracerSpan; import com.alipay.common.tracer.core.utils.TracerUtils; -import com.alipay.sofa.rpc.common.MetadataHolder; +import com.alipay.sofa.rpc.context.RpcInvokeContext; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; @@ -27,8 +27,6 @@ import com.alipay.sofa.rpc.tracer.sofatracer.TracingContextKey; import io.grpc.Metadata; -import java.util.Map; - import static com.alipay.sofa.rpc.server.triple.TripleHeadKeys.HEAD_KEY_TRAFFIC_TYPE; /** @@ -51,8 +49,7 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan(); boolean loadTest = TracerUtils.isLoadTest(currentSpan); if (loadTest) { - Map metaHolder = MetadataHolder.getMetaHolder(); - metaHolder.put(HEAD_KEY_TRAFFIC_TYPE.name(), PRESSURE); + RpcInvokeContext.getContext().addCustomHeader(HEAD_KEY_TRAFFIC_TYPE.name(), PRESSURE); } // provider side ,if in consumer side, metadata == null @@ -63,10 +60,6 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So currentSpan.getSofaTracerSpanContext().setBizBaggageItem(MARK, T); } } - try { - return invoker.invoke(request); - } finally { - MetadataHolder.clear(); - } + return invoker.invoke(request); } } \ No newline at end of file diff --git a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapter.java b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapter.java index b50c275a2..05e39f0da 100644 --- a/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapter.java +++ b/remoting/remoting-triple/src/main/java/com/alipay/sofa/rpc/tracer/sofatracer/TripleTracerAdapter.java @@ -21,7 +21,6 @@ import com.alipay.common.tracer.core.context.trace.SofaTraceContext; import com.alipay.common.tracer.core.holder.SofaTraceContextHolder; import com.alipay.common.tracer.core.span.SofaTracerSpan; -import com.alipay.sofa.rpc.common.MetadataHolder; import com.alipay.sofa.rpc.common.RemotingConstants; import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.common.TracerCompatibleConstants; @@ -142,12 +141,18 @@ public static void beforeSend(SofaRequest sofaRequest, ConsumerConfig consumerCo } // set custom headers - Set> entries = MetadataHolder.getMetaHolder().entrySet(); - for (Map.Entry entry : entries) { - if (StringUtils.isNotBlank(entry.getValue())) { - requestHeader.put(TripleHeadKeys.getKey(entry.getKey()), entry.getValue()); + try{ + Set> customHeader = RpcInvokeContext.getContext().getCustomHeader().entrySet(); + for (Map.Entry entry : customHeader) { + if (StringUtils.isNotBlank(entry.getValue())) { + requestHeader.put(TripleHeadKeys.getKey(entry.getKey()), entry.getValue()); + } } + }finally { + RpcInvokeContext.getContext().clearCustomHeader(); } + + } /** diff --git a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/filter/PressureMarkTransformFilterTest.java b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/filter/PressureMarkTransformFilterTest.java index 499abd49c..b76d5f8c0 100644 --- a/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/filter/PressureMarkTransformFilterTest.java +++ b/remoting/remoting-triple/src/test/java/com/alipay/sofa/rpc/filter/PressureMarkTransformFilterTest.java @@ -20,8 +20,8 @@ import com.alipay.common.tracer.core.holder.SofaTraceContextHolder; import com.alipay.common.tracer.core.span.SofaTracerSpan; import com.alipay.common.tracer.core.utils.TracerUtils; -import com.alipay.sofa.rpc.common.MetadataHolder; import com.alipay.sofa.rpc.config.AbstractInterfaceConfig; +import com.alipay.sofa.rpc.context.RpcInvokeContext; import com.alipay.sofa.rpc.core.exception.SofaRpcException; import com.alipay.sofa.rpc.core.request.SofaRequest; import com.alipay.sofa.rpc.core.response.SofaResponse; @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.HashMap; import java.util.Map; import static com.alipay.sofa.rpc.filter.PressureMarkTransformFilter.PRESSURE; @@ -55,6 +56,9 @@ public void before() { @After public void after() { + invoker.getMetaHolder().clear(); + // mock ConsumerCustomHeaderFilter + RpcInvokeContext.getContext().clearCustomHeader(); SofaTraceContextHolder.getSofaTraceContext().clear(); } @@ -124,7 +128,7 @@ protected EmptyInvoker(AbstractInterfaceConfig config) { @Override public SofaResponse invoke(SofaRequest request) throws SofaRpcException { - this.metaHolder = MetadataHolder.getMetaHolder(); + this.metaHolder = new HashMap<>(RpcInvokeContext.getContext().getCustomHeader()); return null; } } diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterface.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterface.java index d865334df..996fed37e 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterface.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterface.java @@ -28,4 +28,5 @@ public interface TripleHessianInterface { Response call2(Request request); + boolean testPressureMark(String name); } \ No newline at end of file diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterfaceImpl.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterfaceImpl.java index 297d7204a..fa9f156d2 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterfaceImpl.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInterfaceImpl.java @@ -16,6 +16,10 @@ */ package com.alipay.sofa.rpc.triple; +import com.alipay.common.tracer.core.holder.SofaTraceContextHolder; +import com.alipay.common.tracer.core.span.SofaTracerSpan; +import com.alipay.common.tracer.core.utils.TracerUtils; + /** * @author zhaowang * @version : TripleHessianInterfaceImpl.java, v 0.1 2020年06月11日 11:29 上午 zhaowang Exp $ @@ -53,4 +57,10 @@ public Response call2(Request request) { public String getFlag() { return flag; } + + @Override + public boolean testPressureMark(String name) { + SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan(); + return TracerUtils.isLoadTest(currentSpan); + } } \ No newline at end of file diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInvokeTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInvokeTest.java index 84fac3a4d..ed25a92a2 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInvokeTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/triple/TripleHessianInvokeTest.java @@ -16,6 +16,10 @@ */ package com.alipay.sofa.rpc.triple; +import com.alipay.common.tracer.core.SofaTracer; +import com.alipay.common.tracer.core.holder.SofaTraceContextHolder; +import com.alipay.common.tracer.core.span.SofaTracerSpan; +import com.alipay.common.tracer.core.utils.TracerUtils; import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.config.ApplicationConfig; import com.alipay.sofa.rpc.config.ConsumerConfig; @@ -27,15 +31,25 @@ import com.alipay.sofa.rpc.log.LoggerFactory; import org.apache.commons.lang.math.RandomUtils; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import java.util.Map; + /** * @author zhaowang * @version : TripleHessianInvokeTest.java, v 0.1 2020年06月11日 11:16 上午 zhaowang Exp $ */ public class TripleHessianInvokeTest { - private static final Logger LOGGER = LoggerFactory.getLogger(TripleHessianInvokeTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TripleHessianInvokeTest.class); + public static final SofaTracer tracer = new SofaTracer.Builder("TEST").build(); + + @Before + public void before() { + SofaTracerSpan span = (SofaTracerSpan) tracer.buildSpan("test").start(); + SofaTraceContextHolder.getSofaTraceContext().push(span); + } @Test public void testInvoke() throws InterruptedException { @@ -71,11 +85,21 @@ public void testInvoke() throws InterruptedException { TripleHessianInterface helloService = consumerConfig.refer(); - Thread.sleep(10 * 1000); LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName()); helloService.call(); Assert.assertEquals("call", ref.getFlag()); + // test Pressure Mark + boolean isLoadTest = helloService.testPressureMark("name"); + Assert.assertFalse(isLoadTest); + + SofaTracerSpan currentSpan = SofaTraceContextHolder.getSofaTraceContext().getCurrentSpan(); + Map bizBaggage = currentSpan.getSofaTracerSpanContext().getBizBaggage(); + bizBaggage.put("mark", "T"); + Assert.assertTrue(TracerUtils.isLoadTest(currentSpan)); + isLoadTest = helloService.testPressureMark("name"); + Assert.assertTrue(isLoadTest); + String s = helloService.call1(); Assert.assertEquals("call1", ref.getFlag()); Assert.assertEquals("call1", s); @@ -136,7 +160,6 @@ public void testInvokeWithUniqueId() throws InterruptedException { TripleHessianInterface helloService = consumerConfig.refer(); - Thread.sleep(10 * 1000); LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName()); helloService.call(); Assert.assertEquals("call", ref.getFlag()); @@ -167,7 +190,6 @@ public void testInvokeWithUniqueId() throws InterruptedException { helloService = consumerConfig.refer(); - Thread.sleep(10 * 1000); LOGGER.info("Grpc stub bean successful: {}", helloService.getClass().getName()); helloService.call(); Assert.assertEquals("call", ref.getFlag());