diff --git a/core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java b/core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java index 0b8d0d8f9..a638059fa 100644 --- a/core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java +++ b/core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java @@ -33,7 +33,8 @@ public abstract class AbstractLeaderElector extends AbstractLifecycle implements @Autowired private ZkClient zkClient; - private ExecutorService executors = Executors.newCachedThreadPool(XpipeThreadFactory.create(getClass().getSimpleName())); + // single thread make zk events handled sequentially + private ExecutorService executors = Executors.newSingleThreadExecutor(XpipeThreadFactory.create(getClass().getSimpleName())); private ApplicationContext applicationContext; @@ -63,8 +64,8 @@ public void isLeader() { try{ logger.info("[isLeader][notify]{}", entry.getKey()); entry.getValue().isleader(); - }catch (Exception e){ - logger.error("[isLeader]" + entry, e); + }catch (Throwable th){ + logger.error("[isLeader]" + entry, th); } } } @@ -79,8 +80,8 @@ public void notLeader() { try{ logger.info("[notLeader][notify]{}", entry.getKey()); entry.getValue().notLeader(); - }catch (Exception e){ - logger.error("[notLeader]" + entry, e); + }catch (Throwable th){ + logger.error("[notLeader]" + entry, th); } } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java index 44e8280dc..f04151ddb 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java @@ -25,6 +25,7 @@ import org.springframework.web.client.RestOperations; import javax.annotation.Resource; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -114,16 +115,16 @@ public Map getTunnelClusterShardMap() { @Override protected void doStart() throws Exception { - scheduled.scheduleWithFixedDelay(() -> { + future = scheduled.scheduleWithFixedDelay(() -> { if (!taskTrigger.get()) { return; } logger.debug("proxy chain collector started"); - getAllDcProxyChains(); + fetchAllDcProxyChains(); }, getStartTime(), getPeriodic(), TimeUnit.MILLISECONDS); } - private void getAllDcProxyChains() { + protected void fetchAllDcProxyChains() { ParallelCommandChain commandChain = new ParallelCommandChain(MoreExecutors.directExecutor(), false); consoleConfig.getConsoleDomains().forEach((dc, domain)->{ logger.debug("begin to get proxy chain from dc {} {}", dc, domain); @@ -152,18 +153,21 @@ void updateShardProxyChainMap() { dcProxyChainMap.forEach((dc, proxyChainMap) -> { proxyChainMap.forEach((clusterShard, proxyChain) -> { DefaultTunnelInfo tunnel = proxyChain.getTunnelInfos().get(0); - if (tempShardProxyChain.containsKey(clusterShard)) { - tempShardProxyChain.get(clusterShard).getTunnelInfos().add(tunnel); - } else { - tempShardProxyChain.put(clusterShard, proxyChain); + if (!tempShardProxyChain.containsKey(clusterShard)) { + tempShardProxyChain.put(clusterShard, new DefaultProxyChain(proxyChain.getBackupDcId(), + proxyChain.getClusterId(), proxyChain.getShardId(), proxyChain.getPeerDcId(), new ArrayList<>())); } - + tempShardProxyChain.get(clusterShard).getTunnelInfos().add(tunnel); tempTunnelClusterShardMap.put(tunnel.getTunnelId(), clusterShard); }); }); synchronized (DefaultProxyChainCollector.this) { - tunnelClusterShardMap = tempTunnelClusterShardMap; - shardProxyChainMap = tempShardProxyChain; + if (taskTrigger.get()) { + tunnelClusterShardMap = tempTunnelClusterShardMap; + shardProxyChainMap = tempShardProxyChain; + } else { + clear(); + } } } @@ -173,6 +177,12 @@ DefaultProxyChainCollector setDcProxyChainMap(Map getShardProxyChainMap() { return shardProxyChainMap; @@ -186,15 +196,19 @@ protected int getPeriodic() { return 1000; } + protected void clear() { + shardProxyChainMap.clear(); + tunnelClusterShardMap.clear(); + dcProxyChainMap.clear(); + } + @Override protected void doStop() throws Exception { if(future != null) { future.cancel(true); future = null; } - shardProxyChainMap.clear(); - tunnelClusterShardMap.clear(); - dcProxyChainMap.clear(); + clear(); } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCache.java index 6125eb6d1..8b4f53338 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCache.java @@ -85,6 +85,7 @@ public void notLeader() { } private synchronized void stopLoadMeta(){ + logger.info("[loadMeta][stop]{}", this); if (future != null) future.cancel(true); future = null; @@ -98,7 +99,7 @@ private synchronized void stopLoadMeta(){ } public void startLoadMeta() { - logger.info("[loadMeta]{}", this); + logger.info("[loadMeta][start]{}", this); refreshIntervalMilli = consoleConfig.getCacheRefreshInterval(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java index ecaa247cc..a5443f958 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java @@ -2,6 +2,8 @@ import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.command.AbstractCommand; +import com.ctrip.xpipe.command.ParallelCommandChain; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.healthcheck.BiDirectionSupport; import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; @@ -25,11 +27,15 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.Resource; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; + /** * @author chen.zhu *

@@ -58,6 +64,9 @@ public class DefaultDelayService extends CheckerRedisDelayManager implements Del @Autowired private FoundationService foundationService; + @Resource(name = GLOBAL_EXECUTOR) + protected ExecutorService executors; + @Override public void updateRedisDelays(Map redisDelays) { hostPort2Delay.putAll(redisDelays); @@ -221,12 +230,24 @@ public UnhealthyInfoModel getAllUnhealthyInstance() { } UnhealthyInfoModel infoAggregation = new UnhealthyInfoModel(); + ParallelCommandChain commandChain = new ParallelCommandChain(executors); for (String dcId : xpipeMeta.getDcs().keySet()) { - UnhealthyInfoModel unhealthyInfo = getDcActiveClusterUnhealthyInstance(dcId); - if (null == unhealthyInfo) infoAggregation.getAttachFailDc().add(dcId); - else infoAggregation.merge(unhealthyInfo); + FetchDcUnhealthyInstanceCmd cmd = new FetchDcUnhealthyInstanceCmd(dcId); + commandChain.add(cmd); + cmd.future().addListener(commandFuture -> { + if (commandFuture.isSuccess() && null != commandFuture.get()) { + infoAggregation.merge(commandFuture.get()); + } else { + infoAggregation.getAttachFailDc().add(dcId); + } + }); } + try { + commandChain.execute().get(); + } catch (Throwable th) { + logger.info("[getAllUnhealthyInstance][fail] {}", th.getMessage()); + } return infoAggregation; } @@ -239,4 +260,29 @@ public UnhealthyInfoModel getAllUnhealthyInstanceFromParallelService() { public void setFoundationService(FoundationService foundationService) { this.foundationService = foundationService; } + + class FetchDcUnhealthyInstanceCmd extends AbstractCommand { + + private String dc; + + public FetchDcUnhealthyInstanceCmd(String dc) { + this.dc = dc; + } + + @Override + protected void doExecute() throws Throwable { + future().setSuccess(getDcActiveClusterUnhealthyInstance(dc)); + } + + @Override + protected void doReset() { + // do nothing + } + + @Override + public String getName() { + return getClass().getSimpleName(); + } + } + } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollectorTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollectorTest.java index 1b1ca116e..c4271ab99 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollectorTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollectorTest.java @@ -1,18 +1,116 @@ package com.ctrip.xpipe.redis.console.proxy.impl; +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.redis.checker.model.DcClusterShardPeer; import com.ctrip.xpipe.redis.console.AbstractConsoleTest; +import com.ctrip.xpipe.redis.console.config.ConsoleConfig; +import com.ctrip.xpipe.redis.console.model.ProxyModel; +import com.ctrip.xpipe.redis.console.proxy.ProxyChain; +import com.ctrip.xpipe.redis.console.proxy.ProxyChainAnalyzer; import com.ctrip.xpipe.redis.console.proxy.ProxyChainCollector; +import com.ctrip.xpipe.redis.console.reporter.DefaultHttpService; +import org.apache.http.HttpException; +import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.client.RestOperations; +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.IntStream; + +import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + + +@RunWith(MockitoJUnitRunner.class) public class DefaultProxyChainCollectorTest extends AbstractConsoleTest { - @Autowired - ProxyChainCollector collector; + @Mock + private ProxyChainAnalyzer proxyChainAnalyzer; + + @Mock + private ConsoleConfig consoleConfig; + + @Mock + private DefaultHttpService httpService; + + @Mock + private RestOperations restTemplate; + + @InjectMocks + DefaultProxyChainCollector collector; + + private Map consoles = new HashMap() {{ + put("dc1","http://dc1"); + put("dc2","http://dc2"); + }}; + + @Before + public void setupDefaultProxyChainCollectorTest() { + when(httpService.getRestTemplate()).thenReturn(restTemplate); + this.collector.setHttpService(httpService); + when(consoleConfig.getConsoleDomains()).thenReturn(consoles); + } + + private Map generateProxyChains(int cnt) { + Map result = new HashMap<>(); + IntStream.range(0, cnt).forEach(i -> { + DcClusterShardPeer dcClusterShardPeer = new DcClusterShardPeer("dc1", "cluster" + i, "shard" + i, "dc2"); + List tunnels = new ArrayList<>(); + tunnels.add(new DefaultTunnelInfo(new ProxyModel(), "tunnel" + i)); + DefaultProxyChain proxyChain = new DefaultProxyChain("dc1", "cluster" + i, "shard" + i, "dc2", tunnels); + result.put(dcClusterShardPeer, proxyChain); + }); + + return result; + } @Test - public void testUpdateProxyChains() { -// collector.updateShardProxyChainMap(); + public void remoteDcDown_noMemLeak() { + ResponseEntity> resp = new ResponseEntity(generateProxyChains(100), HttpStatus.OK); + when(restTemplate.exchange(anyString(), any(), any(), any(ParameterizedTypeReference.class), anyString())) + .thenReturn(resp); + IntStream.range(0, 10).forEach(i -> collector.fetchAllDcProxyChains()); + + Map> dcProxyChainMap = collector.getDcProxyChainMap(); + for (Map proxyChainMap: dcProxyChainMap.values()) { + for (ProxyChain proxyChain: proxyChainMap.values()) { + Assert.assertEquals(2, proxyChain.getTunnelInfos().size()); + } + } + + doAnswer(inov -> { + String uri = inov.getArgument(0); + String host = consoles.values().iterator().next(); + if (uri.startsWith(host)) throw new HttpException("mock"); + else return resp; + }).when(restTemplate).exchange(anyString(), any(), any(), any(ParameterizedTypeReference.class)); + + IntStream.range(0, 10).forEach(i -> collector.fetchAllDcProxyChains()); + dcProxyChainMap = collector.getDcProxyChainMap(); + for (Map proxyChainMap: dcProxyChainMap.values()) { + for (ProxyChain proxyChain: proxyChainMap.values()) { + Assert.assertEquals(2, proxyChain.getTunnelInfos().size()); + } + } } + } \ No newline at end of file diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/RdbOnlyPsync.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/RdbOnlyPsync.java index 0a1a4ddfd..4c3f288b1 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/RdbOnlyPsync.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/RdbOnlyPsync.java @@ -30,14 +30,12 @@ protected void doWhenFullSyncToNonFreshReplicationStore(String masterRunid) { protected void failPsync(Throwable throwable) { super.failPsync(throwable); - if (psyncState == PSYNC_STATE.PSYNC_COMMAND_WAITING_REPONSE) { - try { - getLogger().debug("[failPsync] psync fail before beginReadRdb"); - currentReplicationStore.close(); - currentReplicationStore.destroy(); - } catch (Throwable th) { - getLogger().warn("[failPsync] release rdb file fail", th); - } + try { + getLogger().info("[failPsync][release rdb]"); + currentReplicationStore.close(); + currentReplicationStore.destroy(); + } catch (Throwable th) { + getLogger().warn("[failPsync][release rdb] fail", th); } } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/RedisOpType.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/RedisOpType.java index 107f2875e..2ec37dde7 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/RedisOpType.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/RedisOpType.java @@ -82,6 +82,11 @@ public enum RedisOpType { MSET(true, -3), MSETNX(true, -3), + // ctrip + GTID_LWM(false, 3, true), + CTRIP_MERGE_START(false, -1, true), + CTRIP_MERGE_END(false, -2, true), + // other SELECT(false, 2), PUBLISH(false, 3), @@ -101,9 +106,16 @@ public enum RedisOpType { // Number of arguments, it is possible to use -N to say >= N private int arity; + private boolean swallow; + RedisOpType(boolean multiKey, int arity) { + this(multiKey, arity, false); + } + + RedisOpType(boolean multiKey, int arity, boolean swallow) { this.supportMultiKey = multiKey; this.arity = arity; + this.swallow = swallow; } public boolean supportMultiKey() { @@ -114,6 +126,10 @@ public int getArity() { return arity; } + public boolean isSwallow() { + return swallow; + } + public boolean checkArgcNotStrictly(Object[] args) { return args.length >= Math.abs(arity); } @@ -122,7 +138,7 @@ public static RedisOpType lookup(String name) { if (StringUtil.isEmpty(name)) return UNKNOWN; try { - return valueOf(name.toUpperCase()); + return valueOf(name.replace('.', '_').toUpperCase()); } catch (IllegalArgumentException illegalArgumentException) { return UNKNOWN; } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/parser/RedisOpSingleKeyEnum.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/parser/RedisOpSingleKeyEnum.java index 8dc005bd8..db1d88a52 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/parser/RedisOpSingleKeyEnum.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/redis/operation/parser/RedisOpSingleKeyEnum.java @@ -78,6 +78,11 @@ public enum RedisOpSingleKeyEnum { // Bit SETBIT(RedisOpType.SETBIT, 1, 3), + // ctrip + GTID_LWM(RedisOpType.GTID_LWM, 1, 2), + CTRIP_MERGE_START(RedisOpType.CTRIP_MERGE_START, null, null), + CTRIP_MERGE_END(RedisOpType.CTRIP_MERGE_END, 1, null), + // others PUBLISH(RedisOpType.PUBLISH, 1, 2), MOVE(RedisOpType.MOVE, 1, null); diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java index 486c6f9fa..8338303b4 100644 --- a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java @@ -1,6 +1,9 @@ package com.ctrip.xpipe.redis.core.redis.parser; import com.ctrip.xpipe.redis.core.redis.operation.*; +import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpLwm; +import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpMergeEnd; +import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpMergeStart; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -17,6 +20,48 @@ @RunWith(SpringJUnit4ClassRunner.class) public class GeneralRedisOpParserTest extends AbstractRedisOpParserTest { + @Test + public void testCtripMergeStartParse() { + RedisOpMergeStart redisOpMergeStart = new RedisOpMergeStart(); + RedisOp redisOp = parser.parse(redisOpMergeStart.buildRawOpArgs()); + Assert.assertEquals(RedisOpType.CTRIP_MERGE_START, redisOp.getOpType()); + Assert.assertNull(redisOp.getOpGtid()); + Assert.assertArrayEquals(redisOpMergeStart.buildRawOpArgs(), redisOp.buildRawOpArgs()); + + RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp; + Assert.assertNull(redisSingleKeyOp.getKey()); + Assert.assertNull(redisSingleKeyOp.getValue()); + Assert.assertTrue(redisOp.getOpType().isSwallow()); + } + + @Test + public void testCtripMergeEndParse() { + RedisOpMergeEnd redisOpMergeEnd = new RedisOpMergeEnd("24d9e2513182d156cbd999df5ebedf24e7634140:1-1494763841"); + RedisOp redisOp = parser.parse(redisOpMergeEnd.buildRawOpArgs()); + Assert.assertEquals(RedisOpType.CTRIP_MERGE_END, redisOp.getOpType()); + Assert.assertNull(redisOp.getOpGtid()); + Assert.assertArrayEquals(redisOpMergeEnd.buildRawOpArgs(), redisOp.buildRawOpArgs()); + + RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp; + Assert.assertArrayEquals("24d9e2513182d156cbd999df5ebedf24e7634140:1-1494763841".getBytes(), redisSingleKeyOp.getKey().get()); + Assert.assertNull(redisSingleKeyOp.getValue()); + Assert.assertTrue(redisOp.getOpType().isSwallow()); + } + + @Test + public void testCtripGtidLwmParse() { + RedisOpLwm redisOpLwm = new RedisOpLwm("24d9e2513182d156cbd999df5ebedf24e7634140", 1494763841L); + RedisOp redisOp = parser.parse(redisOpLwm.buildRawOpArgs()); + Assert.assertEquals(RedisOpType.GTID_LWM, redisOp.getOpType()); + Assert.assertNull(redisOp.getOpGtid()); + Assert.assertArrayEquals(redisOpLwm.buildRawOpArgs(), redisOp.buildRawOpArgs()); + + RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp; + Assert.assertArrayEquals("24d9e2513182d156cbd999df5ebedf24e7634140".getBytes(), redisSingleKeyOp.getKey().get()); + Assert.assertArrayEquals("1494763841".getBytes(), redisSingleKeyOp.getValue()); + Assert.assertTrue(redisOp.getOpType().isSwallow()); + } + @Test public void testSetParse() { RedisOp redisOp = parser.parse(Arrays.asList("SET", "k1", "v1").toArray()); @@ -27,6 +72,7 @@ public void testSetParse() { RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp; Assert.assertArrayEquals("k1".getBytes(), redisSingleKeyOp.getKey().get()); Assert.assertArrayEquals("v1".getBytes(), redisSingleKeyOp.getValue()); + Assert.assertFalse(redisOp.getOpType().isSwallow()); } @Test @@ -39,6 +85,7 @@ public void testGtidParse() { RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp; Assert.assertArrayEquals("k1".getBytes(), redisSingleKeyOp.getKey().get()); Assert.assertArrayEquals("v1".getBytes(), redisSingleKeyOp.getValue()); + Assert.assertFalse(redisOp.getOpType().isSwallow()); } @Test @@ -54,6 +101,7 @@ public void testMSetParse() { Assert.assertArrayEquals("v1".getBytes(), redisMultiKeyOp.getKeyValue(0).getValue()); Assert.assertEquals(new RedisKey("k2"), redisMultiKeyOp.getKeyValue(1).getKey()); Assert.assertArrayEquals("v2".getBytes(), redisMultiKeyOp.getKeyValue(1).getValue()); + Assert.assertFalse(redisOp.getOpType().isSwallow()); } @Test @@ -69,12 +117,14 @@ public void testGtidMSetParse() { Assert.assertArrayEquals("v1".getBytes(), redisMultiKeyOp.getKeyValue(0).getValue()); Assert.assertEquals(new RedisKey("k2"), redisMultiKeyOp.getKeyValue(1).getKey()); Assert.assertArrayEquals("v2".getBytes(), redisMultiKeyOp.getKeyValue(1).getValue()); + Assert.assertFalse(redisOp.getOpType().isSwallow()); } @Test public void testSelectParse() { RedisOp redisOp = parser.parse(Arrays.asList("SELECT", "0").toArray()); Assert.assertEquals(RedisOpType.SELECT, redisOp.getOpType()); + Assert.assertFalse(redisOp.getOpType().isSwallow()); } @Test @@ -85,6 +135,7 @@ public void testPingParse() { RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp; Assert.assertNull(redisSingleKeyOp.getKey()); Assert.assertNull(redisSingleKeyOp.getValue()); + Assert.assertFalse(redisOp.getOpType().isSwallow()); } @Test(expected = UnsupportedOperationException.class) diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java index e9c4aeba0..3a7b655ac 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java @@ -298,7 +298,7 @@ protected boolean shouldFilter(RedisOp redisOp) { return true; } } - return false; + return redisOp.getOpType().isSwallow(); } private void doOnRedisOp(RedisOp redisOp, long commandOffsetToAccumulate) { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java index 0b327bc21..f6112dc7b 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java @@ -383,11 +383,6 @@ public ChannelFuture onCommand(CommandFile currentFile, long filePosition, Objec return future; } - @VisibleForTesting - protected boolean shouldFilter(RedisOp redisOp) { - return false; - } - @Override public String info() { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlave.java index bd09c7b74..1d3ac2fb3 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlave.java @@ -1,8 +1,6 @@ package com.ctrip.xpipe.redis.keeper.impl; import com.ctrip.xpipe.redis.core.protocal.cmd.DefaultXsync; -import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; -import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType; import com.ctrip.xpipe.redis.core.store.GtidSetReplicationProgress; import com.ctrip.xpipe.redis.core.store.ReplicationProgress; import com.ctrip.xpipe.redis.keeper.RedisClient; @@ -33,23 +31,6 @@ protected String buildThreadPrefix(Channel channel) { return "RedisClientXsync-" + getRemoteIpLocalPort; } - @Override - protected boolean shouldFilter(RedisOp redisOp) { - if (RedisOpType.PUBLISH.equals(redisOp.getOpType())) { - int length = redisOp.buildRawOpArgs().length; - if (length < 5) { - logger.warn("publish command length={} < 5, filtered", length); - return true; - } - String channel = new String(redisOp.buildRawOpArgs()[4]); - if (!channel.startsWith("xpipe-asymmetric-")) { - logger.debug("publish channel: [{}] filtered", channel); - return true; - } - } - return false; - } - @Override public boolean supportProgress(Class> clazz) { return clazz.equals(GtidSetReplicationProgress.class); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AllTests.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AllTests.java index 179595fec..c2af8b77e 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AllTests.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AllTests.java @@ -53,7 +53,6 @@ DefaultRdbStoreEofMarkTest.class, DefaultCommandStoreTest.class, DefaultRedisSlaveTest.class, - XsyncRedisSlaveTest.class, RoleCommandHandlerTest.class, DefaultKeeperConfigTest.class, FakeRedisExceptionTest.class, diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlaveTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlaveTest.java deleted file mode 100644 index c25943aae..000000000 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/XsyncRedisSlaveTest.java +++ /dev/null @@ -1,86 +0,0 @@ -package com.ctrip.xpipe.redis.keeper.impl; - -import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; -import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType; -import com.ctrip.xpipe.redis.core.redis.operation.RedisSingleKeyOp; -import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpSingleKey; -import com.ctrip.xpipe.redis.core.redis.operation.op.RedisSingleKeyOpGtidWrapper; -import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperTest; -import com.ctrip.xpipe.redis.keeper.KeeperRepl; -import com.ctrip.xpipe.redis.keeper.RedisClient; -import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; -import io.netty.channel.Channel; -import io.netty.channel.DefaultChannelPromise; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import static org.mockito.Mockito.when; - -/** - * @author lishanglin - * date 2023/11/10 - */ -@RunWith(MockitoJUnitRunner.Silent.class) -public class XsyncRedisSlaveTest extends AbstractRedisKeeperTest { - - @Mock - public Channel channel; - - @Mock - public RedisKeeperServer redisKeeperServer; - - @Mock - private KeeperRepl keeperRepl; - - private XsyncRedisSlave redisSlave; - - @Before - public void setupXsyncRedisSlaveTest() { - when(channel.closeFuture()).thenReturn(new DefaultChannelPromise(channel)); - when(channel.remoteAddress()).thenReturn(localhostInetAdress(randomPort())); - when(keeperRepl.replId()).thenReturn("test-repl-id"); - when(redisKeeperServer.getKeeperRepl()).thenReturn(keeperRepl); - - RedisClient redisClient = new DefaultRedisClient(channel, redisKeeperServer); - redisSlave= new XsyncRedisSlave(redisClient); - } - - @Test - public void testFilterPublish() { - RedisSingleKeyOp op1 = new RedisOpSingleKey(RedisOpType.SET, string2Bytes("set a 1"), null, null); - RedisOp gtidOp1 = new RedisSingleKeyOpGtidWrapper(string2Bytes("GTID ggg:1 0"), "ggg", op1); - Assert.assertFalse(redisSlave.shouldFilter(gtidOp1)); - - RedisSingleKeyOp op2 = new RedisOpSingleKey(RedisOpType.PUBLISH, string2Bytes("publish xpipe-asymmetric-ppp 222"), null, null); - RedisOp gtidOp2 = new RedisSingleKeyOpGtidWrapper(string2Bytes("GTID ggg:1 0"), "ggg", op2); - Assert.assertFalse(redisSlave.shouldFilter(gtidOp2)); - - RedisSingleKeyOp op3 = new RedisOpSingleKey(RedisOpType.PUBLISH, string2Bytes("publish ppp 222"), null, null); - RedisOp gtidOp3 = new RedisSingleKeyOpGtidWrapper(string2Bytes("GTID ggg:1 0"), "ggg", op3); - Assert.assertTrue(redisSlave.shouldFilter(gtidOp3)); - Assert.assertTrue(redisSlave.shouldFilter(op3)); - - //test estimated size by the way - Assert.assertEquals(15, gtidOp1.estimatedSize()); - Assert.assertEquals(40, gtidOp2.estimatedSize()); - Assert.assertEquals(23, gtidOp3.estimatedSize()); - } - - private byte[][] string2Bytes(String s) { - - String[] ss = s.split(" "); - int length = ss.length; - byte[][] b = new byte[length][]; - - for (int i = 0; i < length; i++) { - b[i] = ss[i].getBytes(); - } - - return b; - } - -} diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/store/RdbOnlyPsyncTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/store/RdbOnlyPsyncTest.java index ac06445aa..24f0c2790 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/store/RdbOnlyPsyncTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/store/RdbOnlyPsyncTest.java @@ -5,8 +5,10 @@ import com.ctrip.xpipe.endpoint.DefaultEndPoint; import com.ctrip.xpipe.netty.NettyPoolUtil; import com.ctrip.xpipe.redis.core.AbstractRedisTest; +import com.ctrip.xpipe.redis.core.protocal.PsyncObserver; import com.ctrip.xpipe.redis.core.protocal.cmd.RdbOnlyPsync; import com.ctrip.xpipe.redis.core.store.DumpedRdbStore; +import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncMasterRdbOffsetNotContinuousRuntimeException; import com.ctrip.xpipe.simpleserver.Server; import org.junit.Assert; import org.junit.Test; @@ -19,9 +21,9 @@ public class RdbOnlyPsyncTest extends AbstractRedisTest { @Test - public void fsyncFail_rdbFileClosed() throws Exception { + public void psyncFail_rdbFileClosed() throws Exception { Server redisServer = startServer("-ERR: mock err"); - Endpoint redisEndpoint = new DefaultEndPoint("localhost", redisServer.getPort()); + Endpoint redisEndpoint = new DefaultEndPoint("127.0.0.1", redisServer.getPort()); DumpedRdbStore rdbStore = Mockito.mock(DumpedRdbStore.class); RdbOnlyReplicationStore replicationStore = new RdbOnlyReplicationStore(rdbStore); RdbOnlyPsync psync = new RdbOnlyPsync(NettyPoolUtil.createNettyPool(redisEndpoint), replicationStore, scheduled); @@ -29,7 +31,25 @@ public void fsyncFail_rdbFileClosed() throws Exception { CommandFuture future = psync.execute(); waitConditionUntilTimeOut(future::isDone); Assert.assertFalse(future.isSuccess()); - Mockito.verify(rdbStore, Mockito.times(1)).close(); + Mockito.verify(rdbStore, Mockito.timeout(3000).atLeastOnce()).close(); + } + + @Test + public void offsetNotContinue_rdbFileClosed() throws Exception { + Server redisServer = startServer("+FULLRESYNC 2aaecd36e885a0c9079919e2514f4af4f7a5d1bd 999999999999999\r\n"); + Endpoint redisEndpoint = new DefaultEndPoint("127.0.0.1", redisServer.getPort()); + PsyncObserver failObserver = Mockito.mock(PsyncObserver.class); + DumpedRdbStore rdbStore = Mockito.mock(DumpedRdbStore.class); + RdbOnlyReplicationStore replicationStore = new RdbOnlyReplicationStore(rdbStore); + RdbOnlyPsync psync = new RdbOnlyPsync(NettyPoolUtil.createNettyPool(redisEndpoint), replicationStore, scheduled); + psync.addPsyncObserver(failObserver); + Mockito.doThrow(new PsyncMasterRdbOffsetNotContinuousRuntimeException(100, 1)).when(failObserver).onFullSync(Mockito.anyLong()); + + CommandFuture future = psync.execute(); + waitConditionUntilTimeOut(future::isDone); + Assert.assertFalse(future.isSuccess()); + Assert.assertTrue(future.cause() instanceof PsyncMasterRdbOffsetNotContinuousRuntimeException); + Mockito.verify(rdbStore, Mockito.timeout(3000).atLeastOnce()).close(); } }