From fcfd12e9621c9dda893c7f84713a72dd5aff330e Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Wed, 29 Nov 2023 11:12:00 +0800 Subject: [PATCH] fix proxy doclean npe bug --- ...ltKeeperContainerUsedInfoAnalyzerTest.java | 8 +++- .../proxy/tunnel/DefaultTunnelManager.java | 39 +++++++++++++------ .../com/ctrip/xpipe/redis/proxy/AllTests.java | 2 + .../tunnel/DefaultTunnelManagerTest.java | 26 ++++--------- 4 files changed, 43 insertions(+), 32 deletions(-) diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzerTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzerTest.java index fa6fb8988..b20a87f1f 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzerTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerUsedInfoAnalyzerTest.java @@ -42,6 +42,8 @@ public class DefaultKeeperContainerUsedInfoAnalyzerTest { @Mock private FoundationService service; + public static final int expireTime = 1000; + @Before public void before() { @@ -49,12 +51,14 @@ public void before() { Map standards = Maps.newHashMap(); standards.put(FoundationService.DEFAULT.getDataCenter(), new KeeperContainerOverloadStandardModel().setFlowOverload(10).setPeerDataOverload(10)); Mockito.when(config.getKeeperContainerOverloadStandards()).thenReturn(standards); - Mockito.when(config.getKeeperCheckerIntervalMilli()).thenReturn(10 * 1000); + Mockito.when(config.getKeeperCheckerIntervalMilli()).thenReturn(expireTime); Mockito.doNothing().when(executor).execute(Mockito.any()); } @Test public void testUpdateKeeperContainerUsedInfo() { + //To prevent a second updateKeeperContainerUsedInfo() data when expired + Mockito.when(config.getKeeperCheckerIntervalMilli()).thenReturn(100000); List models1 = new ArrayList<>(); KeeperContainerUsedInfoModel model1 = new KeeperContainerUsedInfoModel("1.1.1.1", "jq", 14, 14); Map> detailInfo1 = Maps.newHashMap(); @@ -96,7 +100,7 @@ public void testUpdateKeeperContainerUsedInfoExpired() throws InterruptedExcepti analyzer.updateKeeperContainerUsedInfo(0, models1); Assert.assertEquals(1, analyzer.getCheckerIndexes().size()); - TimeUnit.MILLISECONDS.sleep(11 * 1000); + TimeUnit.MILLISECONDS.sleep(expireTime+100); List models2 = new ArrayList<>(); KeeperContainerUsedInfoModel model3 = new KeeperContainerUsedInfoModel("3.3.3.3", "jq", 5, 5); diff --git a/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnelManager.java b/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnelManager.java index 3d08530c9..ab44d62fb 100644 --- a/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnelManager.java +++ b/redis/redis-proxy/src/main/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnelManager.java @@ -1,6 +1,7 @@ package com.ctrip.xpipe.redis.proxy.tunnel; import com.ctrip.xpipe.api.factory.ObjectFactory; +import com.ctrip.xpipe.api.lifecycle.Startable; import com.ctrip.xpipe.api.observer.Observable; import com.ctrip.xpipe.api.proxy.ProxyConnectProtocol; import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; @@ -82,20 +83,25 @@ public void preDestroy() throws Exception { protected void doClean() { Set keys = Sets.newHashSet(cache.keySet()); for(Channel channel : keys) { - if (!channel.isActive()) { - Tunnel tunnel = cache.remove(channel); - try { - tunnel.release(); - } catch (Exception e) { - logger.error("[cleaner] tunnel release error: ", e); - } - } else { + try { + if (!channel.isActive()) { + Tunnel tunnel = cache.remove(channel); + try { + tunnel.release(); + } catch (Exception e) { + logger.error("[cleaner] tunnel release tunnel{} error", tunnel, e); + } + } else { - Tunnel tunnel = cache.get(channel); - logger.info("[doClean] check tunnel, {}", tunnel.getTunnelMeta()); - if (tunnel.getState().equals(new TunnelClosed(null))) { - cache.remove(channel); + Tunnel tunnel = cache.get(channel); + if (!tunnel.getLifecycleState().isStarted()) continue; + logger.info("[doClean] check tunnel, {}", tunnel.getTunnelMeta()); + if (tunnel.getState().equals(new TunnelClosed(null))) { + cache.remove(channel); + } } + } catch (Throwable th){ + logger.error("[cleaner] tunnel release channel{} error", channel, th); } } } @@ -191,6 +197,15 @@ public void update(Object args, Observable observable) { // Unit Test + @VisibleForTesting + protected void setLifecycleStateStarted() { + Set keys = Sets.newHashSet(cache.keySet()); + for(Channel channel : keys) { + Tunnel tunnel = cache.get(channel); + tunnel.getLifecycleState().setPhaseName(Startable.PHASE_NAME_END); + } + } + @VisibleForTesting public DefaultTunnelManager setConfig(ProxyConfig config) { this.config = config; diff --git a/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/AllTests.java b/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/AllTests.java index 543a1d988..e834a41ac 100644 --- a/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/AllTests.java +++ b/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/AllTests.java @@ -21,6 +21,7 @@ import com.ctrip.xpipe.redis.proxy.session.state.SessionEstablishedTest; import com.ctrip.xpipe.redis.proxy.session.state.SessionInitTest; import com.ctrip.xpipe.redis.proxy.tunnel.BothSessionTryWriteTest; +import com.ctrip.xpipe.redis.proxy.tunnel.DefaultTunnelManagerTest; import com.ctrip.xpipe.redis.proxy.tunnel.DefaultTunnelTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -66,6 +67,7 @@ DefaultPingStatsTest.class, TestTLSWithTwoProxy.class, TestMassTCPPacketWithOneProxyServer.class, + DefaultTunnelManagerTest.class, }) public class AllTests { diff --git a/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnelManagerTest.java b/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnelManagerTest.java index b6c8e6f61..041d3eaa3 100644 --- a/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnelManagerTest.java +++ b/redis/redis-proxy/src/test/java/com/ctrip/xpipe/redis/proxy/tunnel/DefaultTunnelManagerTest.java @@ -1,19 +1,21 @@ package com.ctrip.xpipe.redis.proxy.tunnel; +import com.ctrip.xpipe.redis.core.proxy.ProxyResourceManager; import com.ctrip.xpipe.redis.proxy.AbstractRedisProxyServerTest; import com.ctrip.xpipe.redis.proxy.Tunnel; +import com.ctrip.xpipe.redis.proxy.monitor.TunnelMonitorManager; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.DefaultChannelPromise; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; import java.net.InetSocketAddress; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** * @author chen.zhu @@ -22,11 +24,13 @@ */ public class DefaultTunnelManagerTest extends AbstractRedisProxyServerTest { + @Mock private DefaultTunnelManager manager; @Before public void beforeDefaultTunnelManagerTest() { manager = (DefaultTunnelManager) tunnelManager(); + manager = spy(manager); } @Test @@ -36,29 +40,15 @@ public void testDoClean() throws Exception { Channel frontChannel2 = fakeChannel(); manager.create(frontChannel1, protocol("Proxy Route proxy://127.0.0.1:8009")); manager.create(frontChannel2, protocol("Proxy Route proxy://127.0.0.1:8009")); - Assert.assertEquals(2, manager.tunnels().size()); when(frontChannel1.isActive()).thenReturn(false); when(frontChannel2.isActive()).thenReturn(true); manager.doClean(); - Assert.assertEquals(1, manager.tunnels().size()); - } - @Test - public void testDoClean2() throws Exception { - startListenServer(8009); - Channel frontChannel1 = fakeChannel(); - Channel frontChannel2 = fakeChannel(); - when(frontChannel2.isActive()).thenReturn(true); - Tunnel tunnel1 = manager.create(frontChannel1, protocol("Proxy Route proxy://127.0.0.1:8009")); - manager.create(frontChannel2, protocol()); - - Assert.assertEquals(2, manager.tunnels().size()); - - tunnel1.release(); - Assert.assertEquals(1, manager.tunnels().size()); + manager.setLifecycleStateStarted(); + manager.doClean(); } private Channel fakeChannel() {