From 5beb80149330eb0be7cae2e442d1eab5d6377b8a Mon Sep 17 00:00:00 2001 From: lilongjiao Date: Sat, 11 May 2024 11:02:53 +0800 Subject: [PATCH] check invalid keepers before reset (#795) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * check invalid keepers before reset * rename method * not reset if keeper unreachable --------- Co-authored-by: llj李龙姣 --- .../collector/command/ResetSentinels.java | 57 ++++++++++++++++--- .../collector/command/ResetSentinelsTest.java | 50 +++++++++++++++- 2 files changed, 95 insertions(+), 12 deletions(-) diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java index 148a2f64c..c940bcace 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinels.java @@ -18,19 +18,15 @@ import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand; import com.ctrip.xpipe.redis.core.protocal.pojo.Role; import com.ctrip.xpipe.redis.core.protocal.pojo.Sentinel; +import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole; import com.ctrip.xpipe.tuple.Pair; import com.ctrip.xpipe.utils.ObjectUtils; import java.net.SocketException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE; @@ -106,12 +102,55 @@ Pair tooManyKeepers(List slaves, String clusterId, St } } slaves.removeAll(keepers); - if (keepers.size() > 1) + if (keepers.size() > 1 && needReset(keepers)) return new Pair<>(true, String.format("%s,%s, has %d keepers:%s", clusterId, shardId, keepers.size(), keepers)); else return new Pair<>(false, null); } + private boolean needReset(Set keepers) { + + Map keeperRoles = new ConcurrentHashMap<>(); + + ParallelCommandChain keeperRoleChain = new ParallelCommandChain(); + for (HostPort hostPort : keepers) { + RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(hostPort.getHost(), hostPort.getPort())), scheduled); + roleCommand.future().addListener(future -> { + if (future.isSuccess()) { + Role role = future.get(); + if (role instanceof SlaveRole) { + keeperRoles.put(hostPort, (SlaveRole) role); + } + } + }); + keeperRoleChain.add(roleCommand); + } + + try { + keeperRoleChain.execute().get(1500, TimeUnit.MILLISECONDS); + } catch (Throwable th) { + logger.warn("[{}-{}+{}]parallel role command to keepers error", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th); + } + + if (keeperRoles.isEmpty()) { + logger.warn("[{}-{}+{}]get role of all keepers failed, keeper status unknown", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId()); + return false; + } + + if (keeperRoles.size() < keepers.size()) { + logger.warn("[{}-{}+{}]get role of keepers:{}, all keepers:{}, some keepers unreachable", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), keeperRoles, keepers); + return false; + } + + Set keeperMasters = keeperRoles.values().stream().map(slaveRole -> new HostPort(slaveRole.getMasterHost(), slaveRole.getMasterPort())).collect(Collectors.toSet()); + if (keeperMasters.size() > 1) { + logger.warn("[{}-{}+{}]keeper master not unique:{}, need reset", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), keeperRoles); + return true; + } + + return false; + } + Pair inOtherClusterShard(List slaves, String clusterId, String shardId) { for (HostPort currentSlave : slaves) { Pair clusterShard = metaCache.findClusterShard(currentSlave); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java index c60bd92f4..d7a6cb5b2 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/command/ResetSentinelsTest.java @@ -52,19 +52,63 @@ public void init() throws Exception { } @Test - public void testOneWayReset() throws Exception { + public void testTooManyKeepers() throws Exception{ RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo())); // sentinelManager.slaves - when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002))); + + // 1、command failed Pair shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard"); + Assert.assertFalse(shouldResetAndReason.getKey()); + + // 2、some keepers unreachable + Server activeKeeper0 = startServer(8000,"*5\r\n" + + "$6\r\nkeeper\r\n" + + "$9\r\nlocalhost\r\n" + + ":6379\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard"); + Assert.assertFalse(shouldResetAndReason.getKey()); + + // 3、keeper master not unique + Server activeKeeper1 = startServer(8001,"*5\r\n" + + "$6\r\nkeeper\r\n" + + "$10\r\nlocalhost2\r\n" + + ":6379\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard"); Assert.assertTrue(shouldResetAndReason.getKey()); Assert.assertTrue(shouldResetAndReason.getValue().contains("has 2 keepers")); + // 4、keeper master unique + activeKeeper1.stop(); + Server activeKeeper2 = startServer(8002,"*5\r\n" + + "$6\r\nkeeper\r\n" + + "$9\r\nlocalhost\r\n" + + ":6379\r\n" + + "$9\r\nconnected\r\n" + + ":477\r\n"); + shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8002)), "cluster", "shard"); + Assert.assertFalse(shouldResetAndReason.getKey()); + activeKeeper2.stop(); + activeKeeper0.stop(); + } + + @Test + public void testOneWayReset() throws Exception { + RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort()); + resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo())); + +// sentinelManager.slaves + when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001))); + HostPort wrongSlave = new HostPort("otherClusterShardSlave", 6379); when(metaCache.findClusterShard(wrongSlave)).thenReturn(new Pair<>("otherCluster", "otherShard")); - shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), wrongSlave), "cluster", "shard"); + Pair shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), wrongSlave), "cluster", "shard"); Assert.assertTrue(shouldResetAndReason.getKey()); Assert.assertTrue(shouldResetAndReason.getValue().contains("but meta:otherCluster:otherShard"));