Skip to content

Commit

Permalink
fix proxy doclean npe bug
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Nov 29, 2023
1 parent 52b46e0 commit fcfd12e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,23 @@ public class DefaultKeeperContainerUsedInfoAnalyzerTest {
@Mock
private FoundationService service;

public static final int expireTime = 1000;


@Before
public void before() {
Mockito.when(config.getClusterDividedParts()).thenReturn(2);
Map<String, KeeperContainerOverloadStandardModel> standards = Maps.newHashMap();
standards.put(FoundationService.DEFAULT.getDataCenter(), new KeeperContainerOverloadStandardModel().setFlowOverload(10).setPeerDataOverload(10));
Mockito.when(config.getKeeperContainerOverloadStandards()).thenReturn(standards);
Mockito.when(config.getKeeperCheckerIntervalMilli()).thenReturn(10 * 1000);
Mockito.when(config.getKeeperCheckerIntervalMilli()).thenReturn(expireTime);
Mockito.doNothing().when(executor).execute(Mockito.any());
}

@Test
public void testUpdateKeeperContainerUsedInfo() {
//To prevent a second updateKeeperContainerUsedInfo() data when expired
Mockito.when(config.getKeeperCheckerIntervalMilli()).thenReturn(100000);
List<KeeperContainerUsedInfoModel> models1 = new ArrayList<>();
KeeperContainerUsedInfoModel model1 = new KeeperContainerUsedInfoModel("1.1.1.1", "jq", 14, 14);
Map<DcClusterShard, Pair<Long, Long>> detailInfo1 = Maps.newHashMap();
Expand Down Expand Up @@ -96,7 +100,7 @@ public void testUpdateKeeperContainerUsedInfoExpired() throws InterruptedExcepti
analyzer.updateKeeperContainerUsedInfo(0, models1);
Assert.assertEquals(1, analyzer.getCheckerIndexes().size());

TimeUnit.MILLISECONDS.sleep(11 * 1000);
TimeUnit.MILLISECONDS.sleep(expireTime+100);

List<KeeperContainerUsedInfoModel> models2 = new ArrayList<>();
KeeperContainerUsedInfoModel model3 = new KeeperContainerUsedInfoModel("3.3.3.3", "jq", 5, 5);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.proxy.tunnel;

import com.ctrip.xpipe.api.factory.ObjectFactory;
import com.ctrip.xpipe.api.lifecycle.Startable;
import com.ctrip.xpipe.api.observer.Observable;
import com.ctrip.xpipe.api.proxy.ProxyConnectProtocol;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
Expand Down Expand Up @@ -82,20 +83,25 @@ public void preDestroy() throws Exception {
protected void doClean() {
Set<Channel> keys = Sets.newHashSet(cache.keySet());
for(Channel channel : keys) {
if (!channel.isActive()) {
Tunnel tunnel = cache.remove(channel);
try {
tunnel.release();
} catch (Exception e) {
logger.error("[cleaner] tunnel release error: ", e);
}
} else {
try {
if (!channel.isActive()) {
Tunnel tunnel = cache.remove(channel);
try {
tunnel.release();
} catch (Exception e) {
logger.error("[cleaner] tunnel release tunnel{} error", tunnel, e);
}
} else {

Tunnel tunnel = cache.get(channel);
logger.info("[doClean] check tunnel, {}", tunnel.getTunnelMeta());
if (tunnel.getState().equals(new TunnelClosed(null))) {
cache.remove(channel);
Tunnel tunnel = cache.get(channel);
if (!tunnel.getLifecycleState().isStarted()) continue;
logger.info("[doClean] check tunnel, {}", tunnel.getTunnelMeta());
if (tunnel.getState().equals(new TunnelClosed(null))) {
cache.remove(channel);
}
}
} catch (Throwable th){
logger.error("[cleaner] tunnel release channel{} error", channel, th);
}
}
}
Expand Down Expand Up @@ -191,6 +197,15 @@ public void update(Object args, Observable observable) {

// Unit Test

@VisibleForTesting
protected void setLifecycleStateStarted() {
Set<Channel> keys = Sets.newHashSet(cache.keySet());
for(Channel channel : keys) {
Tunnel tunnel = cache.get(channel);
tunnel.getLifecycleState().setPhaseName(Startable.PHASE_NAME_END);
}
}

@VisibleForTesting
public DefaultTunnelManager setConfig(ProxyConfig config) {
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ctrip.xpipe.redis.proxy.session.state.SessionEstablishedTest;
import com.ctrip.xpipe.redis.proxy.session.state.SessionInitTest;
import com.ctrip.xpipe.redis.proxy.tunnel.BothSessionTryWriteTest;
import com.ctrip.xpipe.redis.proxy.tunnel.DefaultTunnelManagerTest;
import com.ctrip.xpipe.redis.proxy.tunnel.DefaultTunnelTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
Expand Down Expand Up @@ -66,6 +67,7 @@
DefaultPingStatsTest.class,
TestTLSWithTwoProxy.class,
TestMassTCPPacketWithOneProxyServer.class,
DefaultTunnelManagerTest.class,

})
public class AllTests {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package com.ctrip.xpipe.redis.proxy.tunnel;

import com.ctrip.xpipe.redis.core.proxy.ProxyResourceManager;
import com.ctrip.xpipe.redis.proxy.AbstractRedisProxyServerTest;
import com.ctrip.xpipe.redis.proxy.Tunnel;
import com.ctrip.xpipe.redis.proxy.monitor.TunnelMonitorManager;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.DefaultChannelPromise;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

import java.net.InetSocketAddress;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

/**
* @author chen.zhu
Expand All @@ -22,11 +24,13 @@
*/
public class DefaultTunnelManagerTest extends AbstractRedisProxyServerTest {

@Mock
private DefaultTunnelManager manager;

@Before
public void beforeDefaultTunnelManagerTest() {
manager = (DefaultTunnelManager) tunnelManager();
manager = spy(manager);
}

@Test
Expand All @@ -36,29 +40,15 @@ public void testDoClean() throws Exception {
Channel frontChannel2 = fakeChannel();
manager.create(frontChannel1, protocol("Proxy Route proxy://127.0.0.1:8009"));
manager.create(frontChannel2, protocol("Proxy Route proxy://127.0.0.1:8009"));

Assert.assertEquals(2, manager.tunnels().size());

when(frontChannel1.isActive()).thenReturn(false);
when(frontChannel2.isActive()).thenReturn(true);
manager.doClean();

Assert.assertEquals(1, manager.tunnels().size());
}

@Test
public void testDoClean2() throws Exception {
startListenServer(8009);
Channel frontChannel1 = fakeChannel();
Channel frontChannel2 = fakeChannel();
when(frontChannel2.isActive()).thenReturn(true);
Tunnel tunnel1 = manager.create(frontChannel1, protocol("Proxy Route proxy://127.0.0.1:8009"));
manager.create(frontChannel2, protocol());

Assert.assertEquals(2, manager.tunnels().size());

tunnel1.release();
Assert.assertEquals(1, manager.tunnels().size());
manager.setLifecycleStateStarted();
manager.doClean();
}

private Channel fakeChannel() {
Expand Down

0 comments on commit fcfd12e

Please sign in to comment.