diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java index 173b1707b..acf6b2e29 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java @@ -8,8 +8,11 @@ import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.AbstractRedisConfigRuleAction; import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder; +import com.ctrip.xpipe.redis.checker.model.DcClusterShard; import com.google.common.collect.Lists; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -18,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentMap; /** * @author lishanglin @@ -31,6 +35,12 @@ public class CheckerHealthController { @Autowired private DefaultDelayPingActionCollector defaultDelayPingActionCollector; + @Autowired + private RedisUsedMemoryCollector redisUsedMemoryCollector; + + @Autowired + private KeeperFlowCollector keeperFlowCollector; + @Autowired private HealthCheckInstanceManager instanceManager; @@ -66,6 +76,26 @@ public String getClusterHealthCheckInstance(@PathVariable String clusterId) { return Codec.DEFAULT.encode(model); } + @RequestMapping(value = "/health/check/keeper/{ip}/{port}", method = RequestMethod.GET) + public String getHealthCheckKeeper(@PathVariable String ip, @PathVariable int port) { + KeeperHealthCheckInstance instance = instanceManager.findKeeperHealthCheckInstance(new HostPort(ip, port)); + if(instance == null) { + return "Not found"; + } + HealthCheckInstanceModel model = buildHealthCheckInfo(instance); + return Codec.DEFAULT.encode(model); + } + + @RequestMapping(value = "/health/check/redis-for-assigned-action/{ip}/{port}", method = RequestMethod.GET) + public String getHealthCheckRedisInstanceForAssignedAction(@PathVariable String ip, @PathVariable int port) { + RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForAssignedAction(new HostPort(ip, port)); + if(instance == null) { + return "Not found"; + } + HealthCheckInstanceModel model = buildHealthCheckInfo(instance); + return Codec.DEFAULT.encode(model); + } + @RequestMapping(value = "/health/redis/info/{ip}/{port}", method = RequestMethod.GET) public ActionContextRetMessage> getRedisInfo(@PathVariable String ip, @PathVariable int port) { return ActionContextRetMessage.from(redisInfoManager.getInfoByHostPort(new HostPort(ip, port))); @@ -82,6 +112,16 @@ public Map getAllHealthStatusDesc() { else return Collections.emptyMap(); } + @GetMapping("/health/keeper/status/all") + public ConcurrentMap> getAllKeeperFlows() { + return keeperFlowCollector.getHostPort2InputFlow(); + } + + @GetMapping("/health/redis/used-memory/all") + public ConcurrentMap getAllDclusterShardUsedMemory() { + return redisUsedMemoryCollector.getDcClusterShardUsedMemory(); + } + private HealthCheckInstanceModel buildHealthCheckInfo(HealthCheckInstance instance) { HealthCheckInstanceModel model = new HealthCheckInstanceModel(instance.toString()); for(HealthCheckAction action : instance.getHealthCheckActions()) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java index c30af9689..30f1e0aa8 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java @@ -24,6 +24,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance findRedisHealthCheckInstance(HostPort hostPort); + RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort); + KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort); ClusterHealthCheckInstance findClusterHealthCheckInstance(String clusterId); @@ -32,7 +34,7 @@ public interface HealthCheckInstanceManager { KeeperHealthCheckInstance removeKeeper(HostPort hostPort); - RedisHealthCheckInstance removeRedisOnlyForUsedMemory(HostPort hostPort); + RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort); ClusterHealthCheckInstance remove(String cluster); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java index 56fc78fa8..390ecbc75 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java @@ -88,6 +88,11 @@ public RedisHealthCheckInstance findRedisHealthCheckInstance(HostPort hostPort) return instances.get(hostPort); } + @Override + public RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort) { + return redisInstanceForAssignedAction.get(hostPort); + } + @Override public KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort) { return keeperInstances.get(hostPort); @@ -114,7 +119,7 @@ public KeeperHealthCheckInstance removeKeeper(HostPort hostPort) { } @Override - public RedisHealthCheckInstance removeRedisOnlyForUsedMemory(HostPort hostPort) { + public RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort) { RedisHealthCheckInstance instance = redisInstanceForAssignedAction.remove(hostPort); if (null != instance) instanceFactory.remove(instance); return instance; diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java index 1313525a1..5f311b8f3 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java @@ -4,22 +4,30 @@ import com.ctrip.xpipe.cluster.ClusterType; import com.ctrip.xpipe.lifecycle.AbstractLifecycle; import com.ctrip.xpipe.lifecycle.LifecycleHelper; +import com.ctrip.xpipe.redis.checker.CheckerConsoleService; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager; import com.ctrip.xpipe.redis.checker.healthcheck.HealthChecker; import com.ctrip.xpipe.redis.checker.healthcheck.meta.MetaChangeManager; import com.ctrip.xpipe.redis.core.entity.*; +import com.ctrip.xpipe.redis.core.meta.KeeperContainerDetailInfo; import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.utils.StringUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import org.xml.sax.SAXException; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.annotation.Resource; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @@ -44,6 +52,9 @@ public class DefaultHealthChecker extends AbstractLifecycle implements HealthChe @Autowired private CheckerConfig checkerConfig; + @Autowired + private CheckerConsoleService checkerConsoleService; + @Resource(name = SCHEDULED_EXECUTOR) private ScheduledExecutorService scheduled; @@ -113,6 +124,16 @@ void generateHealthCheckInstances() { if(checkerConfig.getIgnoredHealthCheckDc().contains(dcMeta.getId())) { continue; } + + if (currentDcId.equalsIgnoreCase(dcMeta.getId())) { + Map keeperContainerDetailInfoMap + = getAllKeeperContainerDetailInfoFromDcMeta(dcMeta, getCurrentDcMeta(dcMeta.getId())); + + keeperContainerDetailInfoMap.values().forEach(keeperContainerDetailInfo -> { + generateHealthCheckInstances(keeperContainerDetailInfo); + }); + } + for(ClusterMeta cluster : dcMeta.getClusters().values()) { ClusterType clusterType = ClusterType.lookup(cluster.getType()); @@ -131,6 +152,15 @@ void generateHealthCheckInstances() { } } + private void generateHealthCheckInstances(KeeperContainerDetailInfo keeperContainerDetailInfo) { + for (KeeperMeta keeperMeta : keeperContainerDetailInfo.getKeeperInstances()) { + instanceManager.getOrCreate(keeperMeta); + } + for (RedisMeta redisMeta : keeperContainerDetailInfo.getRedisInstances()) { + instanceManager.getOrCreateRedisInstanceForAssignedAction(redisMeta); + } + } + void generateHealthCheckInstances(ClusterMeta clusterMeta){ for(ShardMeta shard : clusterMeta.getShards().values()) { for(RedisMeta redis : shard.getRedises()) { @@ -156,6 +186,44 @@ private boolean isClusterInCurrentIdc(ClusterMeta cluster) { return false; } + private DcMeta getCurrentDcMeta(String dcId) { + try { + return checkerConsoleService.getXpipeAllDCMeta(checkerConfig.getConsoleAddress(), dcId) + .getDcs().get(dcId); + } catch (SAXException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + private Map getAllKeeperContainerDetailInfoFromDcMeta(DcMeta dcMeta, DcMeta allDcMeta) { + Map map = dcMeta.getKeeperContainers().stream() + .collect(Collectors.toMap(KeeperContainerMeta::getId, + keeperContainerMeta -> new KeeperContainerDetailInfo(keeperContainerMeta, new ArrayList<>(), new ArrayList<>()))); + allDcMeta.getClusters().values().forEach(clusterMeta -> { + for (ShardMeta shardMeta : clusterMeta.getAllShards().values()){ + if (shardMeta.getKeepers() == null || shardMeta.getKeepers().isEmpty()) continue; + RedisMeta monitorRedis = getMonitorRedisMeta(shardMeta.getRedises()); + shardMeta.getKeepers().forEach(keeperMeta -> { + if (map.containsKey(keeperMeta.getKeeperContainerId())) { + map.get(keeperMeta.getKeeperContainerId()).getKeeperInstances().add(keeperMeta); + map.get(keeperMeta.getKeeperContainerId()).getRedisInstances().add(monitorRedis); + } + }); + } + }); + + return map; + } + + private RedisMeta getMonitorRedisMeta(List redisMetas) { + if (redisMetas == null || redisMetas.isEmpty()) return null; + return redisMetas.stream().sorted((r1, r2) -> (r1.getIp().hashCode() - r2.getIp().hashCode())) + .collect(Collectors.toList()).get(0); + } + private boolean clusterDcIsCurrentDc(ClusterMeta clusterMeta) { return clusterMeta.parent().getId().equalsIgnoreCase(currentDcId); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java index ceec1de0f..3e6fa6a68 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java @@ -279,7 +279,7 @@ private void addKeeper(KeeperMeta added) { } private void removeRedisOnlyForUsedMemory(RedisMeta removed) { - if (null != instanceManager.removeRedisOnlyForUsedMemory(new HostPort(removed.getIp(), removed.getPort()))) { + if (null != instanceManager.removeRedisInstanceForAssignedAction(new HostPort(removed.getIp(), removed.getPort()))) { logger.info("[removeRedisOnlyForUsedMemory][{}:{}] {}", removed.getIp(), removed.getPort(), removed); } } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java index c0c3e728d..e499503d6 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java @@ -505,7 +505,7 @@ public void testKeeperChange() throws Exception { // only change keeper reload Mockito.verify(instanceManager, times(2)).removeKeeper(any(HostPort.class)); Mockito.verify(instanceManager, times(1)).getOrCreate(any(KeeperMeta.class)); - Mockito.verify(instanceManager, times(0)).removeRedisOnlyForUsedMemory(any(HostPort.class)); + Mockito.verify(instanceManager, times(0)).removeRedisInstanceForAssignedAction(any(HostPort.class)); Mockito.verify(instanceManager, times(0)).getOrCreateRedisInstanceForAssignedAction(any(RedisMeta.class)); } @@ -529,7 +529,7 @@ public void testRedisChange() throws Exception { // only change redis changed Mockito.verify(instanceManager, times(0)).removeKeeper(any(HostPort.class)); Mockito.verify(instanceManager, times(0)).getOrCreate(any(KeeperMeta.class)); - Mockito.verify(instanceManager, times(1)).removeRedisOnlyForUsedMemory(any(HostPort.class)); + Mockito.verify(instanceManager, times(1)).removeRedisInstanceForAssignedAction(any(HostPort.class)); Mockito.verify(instanceManager, times(1)).getOrCreateRedisInstanceForAssignedAction(any(RedisMeta.class)); }