diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java index e107c75c1..a11685a22 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java @@ -4,6 +4,7 @@ import com.ctrip.xpipe.redis.core.entity.ClusterMeta; import com.ctrip.xpipe.redis.core.entity.KeeperMeta; import com.ctrip.xpipe.redis.core.entity.RedisMeta; +import com.ctrip.xpipe.redis.core.entity.XpipeMeta; import java.util.List; @@ -52,4 +53,6 @@ public interface HealthCheckInstanceManager { List getAllClusterInstance(); + boolean checkInstancesMiss(XpipeMeta xpipeMeta); + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java index 26a2ec168..1795d1bbd 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java @@ -1,10 +1,11 @@ package com.ctrip.xpipe.redis.checker.healthcheck.impl; +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.api.monitor.EventMonitor; +import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.healthcheck.*; -import com.ctrip.xpipe.redis.core.entity.ClusterMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperMeta; -import com.ctrip.xpipe.redis.core.entity.RedisMeta; +import com.ctrip.xpipe.redis.core.entity.*; import com.ctrip.xpipe.utils.MapUtils; import com.ctrip.xpipe.utils.StringUtil; import com.google.common.collect.Lists; @@ -14,7 +15,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.List; +import java.util.*; import java.util.concurrent.ConcurrentMap; /** @@ -37,6 +38,8 @@ public class DefaultHealthCheckInstanceManager implements HealthCheckInstanceMan private ConcurrentMap redisInstanceForPingAction = Maps.newConcurrentMap(); + private static final String ALERT_TYPE = "HealthCheckInstance"; + @Autowired private HealthCheckInstanceFactory instanceFactory; @@ -179,4 +182,88 @@ public List getAllClusterInstance() { return Lists.newLinkedList(clusterHealthCheckerInstances.values()); } + public boolean checkInstancesMiss(XpipeMeta xpipeMeta) { + if (null == xpipeMeta) return true; + + logger.debug("[checkInstancesMiss][begin]"); + Set currentClusters = clusterHealthCheckerInstances.keySet(); + Set currentInstances = instances.keySet(); + Set currentPingInstances = redisInstanceForPingAction.keySet(); + + Set expectClusters = new HashSet<>(); + Set expectInstances = new HashSet<>(); + Set expectPingInstances = new HashSet<>(); + + String currentDc = FoundationService.DEFAULT.getDataCenter(); + String currentZone = xpipeMeta.getDcs().get(currentDc).getZone(); + for (Map.Entry entry: xpipeMeta.getDcs().entrySet()) { + String dcId = entry.getKey(); + DcMeta dcMeta = entry.getValue(); + for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { + ClusterType clusterType = ClusterType.lookup(clusterMeta.getType()); + String clusterId = clusterMeta.getId().toLowerCase(); + boolean addInstanceForPing = false; + if (clusterType.equals(ClusterType.ONE_WAY)) { + String activeDc = clusterMeta.getActiveDc(); + if (activeDc.equalsIgnoreCase(currentDc)) { + expectClusters.add(clusterId); + } else if (dcId.equalsIgnoreCase(currentDc) && !currentZone.equalsIgnoreCase(xpipeMeta.getDcs().get(activeDc).getZone())) { + addInstanceForPing = true; + } else { + continue; + } + } else if (clusterType.supportSingleActiveDC() || clusterType.equals(ClusterType.CROSS_DC)) { + String activeDc = clusterMeta.getActiveDc(); + if (activeDc.equalsIgnoreCase(currentDc)) { + expectClusters.add(clusterId); + } else { + continue; + } + } else if (!expectClusters.contains(clusterId)) { + String[] dcs = clusterMeta.getDcs().split("\\s*,\\s*"); + for (String dc : dcs) { + if (dc.equalsIgnoreCase(currentDc)) expectClusters.add(clusterId); + } + + if (!expectClusters.contains(clusterId)) continue; + } + + for (ShardMeta shardMeta: clusterMeta.getShards().values()) { + for (RedisMeta redisMeta: shardMeta.getRedises()) { + if (addInstanceForPing) { + expectPingInstances.add(new HostPort(redisMeta.getIp(), redisMeta.getPort())); + } else { + expectInstances.add(new HostPort(redisMeta.getIp(), redisMeta.getPort())); + } + } + } + } + } + + boolean noMissing = true; + if (!currentClusters.equals(expectClusters)) { + noMissing = false; + logger.debug("[checkInstancesMiss][cluster][current] {}", currentClusters); + logger.debug("[checkInstancesMiss][cluster][expect] {}", expectClusters); + EventMonitor.DEFAULT.logEvent(ALERT_TYPE, "clusterMissing"); + } + if (!currentInstances.equals(expectInstances)) { + noMissing = false; + logger.debug("[checkInstancesMiss][instance][current] {}", currentInstances); + logger.debug("[checkInstancesMiss][instance][expect] {}", expectInstances); + EventMonitor.DEFAULT.logEvent(ALERT_TYPE, "instanceMissing"); + } + if (!currentPingInstances.equals(expectPingInstances)) { + noMissing = false; + logger.debug("[checkInstancesMiss][CrossRegionInstance][current] {}", currentPingInstances); + logger.debug("[checkInstancesMiss][CrossRegionInstance][expect] {}", expectPingInstances); + EventMonitor.DEFAULT.logEvent(ALERT_TYPE, "CrossRegionInstanceMissing"); + } + if (noMissing) { + EventMonitor.DEFAULT.logEvent(ALERT_TYPE, "noMissing"); + } + + return noMissing; + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java index 80216b653..55c66c2a1 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java @@ -14,6 +14,7 @@ import com.ctrip.xpipe.redis.core.meta.KeeperContainerDetailInfo; import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.utils.StringUtil; +import com.ctrip.xpipe.utils.VisibleForTesting; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -116,8 +117,13 @@ protected void doStop() throws Exception { super.doStop(); } + @VisibleForTesting + protected void setInstanceManager(HealthCheckInstanceManager instanceManager) { + this.instanceManager = instanceManager; + } - void generateHealthCheckInstances() { + @VisibleForTesting + protected void generateHealthCheckInstances() { XpipeMeta meta = metaCache.getXpipeMeta(); for(DcMeta dcMeta : meta.getDcs().values()) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java index 82655e604..3300ad064 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java @@ -68,6 +68,7 @@ public void start() { @Override protected void doRun() throws Exception { checkDcMetaChange(); + instanceManager.checkInstancesMiss(metaCache.getXpipeMeta()); } }, interval * 2, interval, TimeUnit.MILLISECONDS); } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AllTests.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AllTests.java index 82a325494..ca57b01cd 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AllTests.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AllTests.java @@ -84,6 +84,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.factory.DefaultHealthCheckEndpointFactoryTest; import com.ctrip.xpipe.redis.checker.healthcheck.factory.DefaultHealthCheckInstanceFactoryTest; import com.ctrip.xpipe.redis.checker.healthcheck.factory.HealthCheckEndpointFactoryTest; +import com.ctrip.xpipe.redis.checker.healthcheck.impl.DefaultHealthCheckInstanceManagerTest; import com.ctrip.xpipe.redis.checker.healthcheck.impl.DefaultHealthCheckerMockTest; import com.ctrip.xpipe.redis.checker.healthcheck.impl.DefaultRouteChooserTest; import com.ctrip.xpipe.redis.checker.healthcheck.meta.DefaultDcMetaChangeManagerTest; @@ -217,6 +218,8 @@ DefaultHealthCheckConfigTest.class, DefaultRouteChooserTest.class, + + DefaultHealthCheckInstanceManagerTest.class }) public class AllTests { } \ No newline at end of file diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManagerTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManagerTest.java new file mode 100644 index 000000000..9a4d1049a --- /dev/null +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManagerTest.java @@ -0,0 +1,102 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.impl; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.redis.checker.AbstractCheckerTest; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.ClusterHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.KeeperHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.core.entity.*; +import com.ctrip.xpipe.redis.core.meta.CurrentDcAllMeta; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Collections; + +/** + * @author lishanglin + * date 2024/9/14 + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class DefaultHealthCheckInstanceManagerTest extends AbstractCheckerTest { + + @InjectMocks + private DefaultHealthChecker healthChecker; + + @InjectMocks + private DefaultHealthCheckInstanceManager healthCheckInstanceManager; + + @Mock + private MetaCache metaCache; + + @Mock + private CheckerConfig checkerConfig; + + @Mock + private CurrentDcAllMeta currentDcAllMeta; + + @Mock + private HealthCheckInstanceFactory instanceFactory; + + @Mock + private RedisHealthCheckInstance mockCheckInstance; + + @Mock + private KeeperHealthCheckInstance mockKeeperInstance; + + @Mock + private ClusterHealthCheckInstance mockClusterInstance; + + @Before + public void setupDefaultHealthCheckInstanceManagerTest() { + Mockito.when(instanceFactory.create(Mockito.any(RedisMeta.class))).thenReturn(mockCheckInstance); + Mockito.when(instanceFactory.create(Mockito.any(KeeperMeta.class))).thenReturn(mockKeeperInstance); + Mockito.when(instanceFactory.create(Mockito.any(ClusterMeta.class))).thenReturn(mockClusterInstance); + Mockito.when(instanceFactory.createRedisInstanceForAssignedAction(Mockito.any())).thenReturn(mockCheckInstance); + Mockito.when(instanceFactory.getOrCreateRedisInstanceForPsubPingAction(Mockito.any())).thenReturn(mockCheckInstance); + + Mockito.when(checkerConfig.getIgnoredHealthCheckDc()).thenReturn(Collections.emptySet()); + + Mockito.when(currentDcAllMeta.getCurrentDcAllMeta()).thenReturn(getXpipeMeta().findDc(FoundationService.DEFAULT.getDataCenter())); + + Mockito.when(metaCache.getXpipeMeta()).thenReturn(getXpipeMeta()); + Mockito.doAnswer(inv -> { + String currentDc = inv.getArgument(0, String.class); + String otherDc = inv.getArgument(1, String.class); + DcMeta currentDcMeta = getXpipeMeta().findDc(currentDc); + DcMeta otherDcMeta = getXpipeMeta().findDc(otherDc); + if (null == currentDcMeta || null == otherDcMeta) return false; + return !currentDcMeta.getZone().equalsIgnoreCase(otherDcMeta.getZone()); + }).when(metaCache).isCrossRegion(Mockito.anyString(), Mockito.anyString()); + + healthChecker.setInstanceManager(healthCheckInstanceManager); + } + + @Test + public void testInstanceMatch() { + healthChecker.generateHealthCheckInstances(); + Assert.assertTrue(healthCheckInstanceManager.checkInstancesMiss(getXpipeMeta())); + } + + @Test + public void testInstanceDisMatch() { + healthChecker.generateHealthCheckInstances(); + XpipeMeta xpipeMeta = getXpipeMeta(); + xpipeMeta.getDcs().get("jq").getClusters().remove("bbz_qmq_idempotent_fra_default"); + Assert.assertFalse(healthCheckInstanceManager.checkInstancesMiss(xpipeMeta)); + } + + @Override + protected String getXpipeMetaConfigFile() { + return "multi-type-health-instances.xml"; + } + + +} diff --git a/redis/redis-checker/src/test/resources/multi-type-health-instances.xml b/redis/redis-checker/src/test/resources/multi-type-health-instances.xml new file mode 100644 index 000000000..cf0af769a --- /dev/null +++ b/redis/redis-checker/src/test/resources/multi-type-health-instances.xml @@ -0,0 +1,89 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/nonredis/keepercontainer/KeeperContainerDiskIOLimitDispatcher.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/nonredis/keepercontainer/KeeperContainerDiskIOLimitDispatcher.java index 13d03f0f5..06013f664 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/nonredis/keepercontainer/KeeperContainerDiskIOLimitDispatcher.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/nonredis/keepercontainer/KeeperContainerDiskIOLimitDispatcher.java @@ -113,9 +113,14 @@ protected List alertTypes() { return Collections.emptyList(); } + @Override + protected long getIntervalMilli() { + return Math.max(60000, super.getIntervalMilli()); + } + @Override protected boolean shouldDoAction() { - return config.autoSetKeeperSyncLimit(); + return config.autoSetKeeperSyncLimit() && super.shouldDoAction(); } }