From 47f23356913b11872acf59321ec3174f2931eab3 Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Thu, 15 Aug 2024 20:11:49 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B7=A8region=E6=8B=89=E5=85=A5=E6=8B=89?= =?UTF-8?q?=E5=87=BA=E6=A3=80=E6=B5=8B=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redis/checker/CheckerConsoleService.java | 4 +- .../xpipe/redis/checker/CheckerService.java | 4 + .../xpipe/redis/checker/OuterClientCache.java | 4 +- .../controller/CheckerHealthController.java | 36 ++++ .../HealthCheckInstanceManager.java | 6 + .../AbstractPsubPingActionCollector.java | 94 ++++++++ .../CrossRegionRedisHealthStatus.java | 100 +++++++++ .../DefaultPsubPingActionCollector.java | 116 ++++++++++ .../actions/interaction/HealthStatus.java | 10 +- ...egionHealthStatusConsistenceInspector.java | 202 ++++++++++++++++++ .../InstanceHealthStatusCollector.java | 53 +++-- ...tanceHealthStatusConsistenceInspector.java | 49 ++--- .../InstanceStatusAdjustCommand.java | 7 +- .../compensator/InstanceStatusAdjuster.java | 4 +- .../handler/AbstractHealthEventHandler.java | 14 +- .../ping/OneWayPingActionController.java | 6 + .../actions/ping/PingActionFactory.java | 34 ++- .../OneWayPsubActionController.java | 24 +++ .../actions/psubscribe/PsubAction.java | 50 +++++ .../actions/psubscribe/PsubActionContext.java | 10 + .../psubscribe/PsubActionController.java | 7 + .../actions/psubscribe/PsubActionFactory.java | 71 ++++++ .../psubscribe/PsubActionListener.java | 15 ++ .../psubscribe/PsubPingActionCollector.java | 14 ++ .../config/CompositeHealthCheckConfig.java | 5 +- .../DefaultHealthCheckInstanceFactory.java | 33 ++- .../DefaultHealthCheckInstanceManager.java | 26 +++ .../impl/DefaultHealthChecker.java | 17 +- .../impl/HealthCheckInstanceFactory.java | 2 + .../meta/DefaultDcMetaChangeManager.java | 65 +++++- .../meta/DefaultMetaChangeManager.java | 2 +- .../healthcheck/session/RedisSession.java | 6 + .../checker/impl/DefaultCheckerService.java | 14 ++ .../checker/impl/HealthCheckReporter.java | 39 +++- .../DefaultCheckerConsoleService.java | 15 +- .../AbstractCheckerIntegrationTest.java | 7 +- .../InstanceHealthStatusCollectorTest.java | 5 +- ...eHealthStatusConsistenceInspectorTest.java | 7 +- .../InstanceStatusAdjustCommandTest.java | 14 +- .../meta/DefaultDcMetaChangeManagerTest.java | 12 +- .../checker/ConsoleCheckerApiService.java | 8 + .../checker/ConsoleCheckerGroupService.java | 4 +- .../InstanceHealthCheckGetCommand.java | 11 +- .../InstanceHealthCheckGetGroupCommand.java | 7 +- .../InstanceHealthStatusGetCommand.java | 11 +- .../InstanceHealthStatusGetGroupCommand.java | 7 +- .../impl/DefaultConsoleCheckerApiService.java | 10 + .../DefaultConsoleCheckerGroupService.java | 8 +- .../impl/DefaultConsoleDcCheckerService.java | 14 +- .../console/impl/DefaultConsoleService.java | 16 ++ .../api/checker/ConsoleCheckerController.java | 9 +- .../model/ShardCheckerHealthCheckModel.java | 12 +- .../resources/CheckerOuterClientCache.java | 17 +- .../resources/DefaultOuterClientCache.java | 104 ++++++++- .../console/spring/CheckerContextConfig.java | 5 +- .../views/index/full_link_health_check.html | 5 +- .../DefaultConsoleDcCheckerServiceTest.java | 4 +- .../core/console/ConsoleCheckerPath.java | 2 + .../cmd/pubsub/AbstractSubscribe.java | 12 +- .../cmd/pubsub/PsubscribeCommand.java | 33 +++ .../protocal/cmd/pubsub/SubscribeCommand.java | 2 +- .../console/TestCheckerContextConfig.java | 5 +- 62 files changed, 1357 insertions(+), 152 deletions(-) create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java create mode 100644 redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java create mode 100644 redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java index 467824242..795083a41 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java @@ -57,7 +57,9 @@ public interface CheckerConsoleService { Map loadAllClusterCreateTime(String console); - Map loadAllActiveDcOneWayClusterInfo(String console, String activeDc); + Map loadAllDcOneWayClusterInfo(String console, String dc); + + Map loadCurrentDcOneWayClusterInfo(String console, String dc); void bindShardSentinel(String console, String dc, String cluster, String shard, SentinelMeta sentinelMeta); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java index 5268f40fd..ee095e5cb 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java @@ -17,6 +17,10 @@ class AllInstanceHealthStatus extends HashMap {} HEALTH_STATE getInstanceStatus(String ip, int port); + HEALTH_STATE getCrossRegionInstanceStatus(String ip, int port); + Map getAllInstanceHealthStatus(); + Map getAllInstanceCrossRegionHealthStatus(); + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java index b55baa24a..b38101fa9 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java @@ -12,6 +12,8 @@ public interface OuterClientCache { OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws Exception; - Map getAllActiveDcClusters(String activeDc); + Map getAllDcClusters(String dc); + + Map getAllCurrentDcClusters(String dc); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java index 2e4593862..c1855fb7f 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.redis.checker.controller.result.ActionContextRetMessage; import com.ctrip.xpipe.redis.checker.healthcheck.*; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultPsubPingActionCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector; @@ -36,6 +37,9 @@ public class CheckerHealthController { @Autowired private DefaultDelayPingActionCollector defaultDelayPingActionCollector; + @Autowired + private DefaultPsubPingActionCollector defaultPsubPingActionCollector; + @Autowired private RedisUsedMemoryCollector redisUsedMemoryCollector; @@ -57,6 +61,12 @@ public HEALTH_STATE getHealthState(@PathVariable String ip, @PathVariable int po else return HEALTH_STATE.UNKNOWN; } + @RequestMapping(value = "/health/cross/region/{ip}/{port}", method = RequestMethod.GET) + public HEALTH_STATE getCrossRegionHealthState(@PathVariable String ip, @PathVariable int port) { + if (siteStability.isSiteStable()) return defaultPsubPingActionCollector.getHealthState(new HostPort(ip, port)); + else return HEALTH_STATE.UNKNOWN; + } + @RequestMapping(value = "/health/check/instance/{ip}/{port}", method = RequestMethod.GET) public String getHealthCheckInstance(@PathVariable String ip, @PathVariable int port) { RedisHealthCheckInstance instance = instanceManager.findRedisHealthCheckInstance(new HostPort(ip, port)); @@ -67,6 +77,16 @@ public String getHealthCheckInstance(@PathVariable String ip, @PathVariable int return Codec.DEFAULT.encode(model); } + @RequestMapping(value = "/health/check/cross/region//instance/{ip}/{port}", method = RequestMethod.GET) + public String getCrossRegionHealthCheckInstance(@PathVariable String ip, @PathVariable int port) { + RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForPsubPingAction(new HostPort(ip, port)); + if(instance == null) { + return "Not found"; + } + HealthCheckInstanceModel model = buildHealthCheckInfo(instance); + return Codec.DEFAULT.encode(model); + } + @RequestMapping(value = "/health/check/cluster/{clusterId}", method = RequestMethod.GET) public String getClusterHealthCheckInstance(@PathVariable String clusterId) { ClusterHealthCheckInstance instance = instanceManager.findClusterHealthCheckInstance(clusterId); @@ -97,6 +117,16 @@ public String getHealthCheckRedisInstanceForAssignedAction(@PathVariable String return Codec.DEFAULT.encode(model); } + @RequestMapping(value = "/health/check/redis-for-ping-action/{ip}/{port}", method = RequestMethod.GET) + public String getHealthCheckRedisInstanceForPingAction(@PathVariable String ip, @PathVariable int port) { + RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForPsubPingAction(new HostPort(ip, port)); + if(instance == null) { + return "Not found"; + } + HealthCheckInstanceModel model = buildHealthCheckInfo(instance); + return Codec.DEFAULT.encode(model); + } + @RequestMapping(value = "/health/redis/info/{ip}/{port}", method = RequestMethod.GET) public ActionContextRetMessage> getRedisInfo(@PathVariable String ip, @PathVariable int port) { return ActionContextRetMessage.from(redisInfoManager.getInfoByHostPort(new HostPort(ip, port))); @@ -113,6 +143,12 @@ public Map getAllHealthStatusDesc() { else return Collections.emptyMap(); } + @GetMapping("/health/check/cross/region/status/all") + public Map getAllCrossRegionHealthStatusDesc() { + if (siteStability.isSiteStable()) return defaultPsubPingActionCollector.getAllHealthStatus(); + else return Collections.emptyMap(); + } + @GetMapping("/health/keeper/status/all") public ConcurrentMap> getAllKeeperFlows() { return keeperFlowCollector.getHostPort2InputFlow(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java index 30f1e0aa8..e107c75c1 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java @@ -18,6 +18,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisMeta redis); + RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis); + KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper); ClusterHealthCheckInstance getOrCreate(ClusterMeta cluster); @@ -26,6 +28,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort); + RedisHealthCheckInstance findRedisInstanceForPsubPingAction(HostPort hostPort); + KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort); ClusterHealthCheckInstance findClusterHealthCheckInstance(String clusterId); @@ -36,6 +40,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort); + RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort); + ClusterHealthCheckInstance remove(String cluster); List getAllRedisInstance(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java new file mode 100644 index 000000000..33ba4ea6f --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java @@ -0,0 +1,94 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector; +import com.google.common.collect.Maps; + +import java.util.Map; + +public abstract class AbstractPsubPingActionCollector implements PsubPingActionCollector { + + protected Map allHealthStatus = Maps.newConcurrentMap(); + + protected PingActionListener pingActionListener = new AbstractPsubPingActionCollector.CollectorPingActionListener(); + + protected PsubActionListener psubActionListener = new AbstractPsubPingActionCollector.CollectorPsubActionListener(); + + protected abstract HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance); + + protected void removeHealthStatus(HealthCheckAction action) { + HealthStatus healthStatus = allHealthStatus.remove(action.getActionInstance()); + if(healthStatus != null) { + healthStatus.stop(); + } + } + + @Override + public boolean supportInstance(RedisHealthCheckInstance instance) { + return true; + } + + @Override + public PingActionListener createPingActionListener() { + return pingActionListener; + } + + @Override + public PsubActionListener createPsubActionListener() { + return psubActionListener; + } + + protected class CollectorPingActionListener implements PingActionListener { + + @Override + public void onAction(PingActionContext pingActionContext) { + HealthStatus healthStatus = createOrGetHealthStatus(pingActionContext.instance()); + if (!pingActionContext.isSuccess()) { + if (pingActionContext.getCause().getMessage().contains("LOADING")) { + healthStatus.loading(); + } + return; + } + + if (pingActionContext.getResult()) { + healthStatus.pong(); + } else { + if(healthStatus.getState() == HEALTH_STATE.UNKNOWN) { + healthStatus.pongInit(); + } + } + } + + @Override + public boolean worksfor(ActionContext t) { + return t instanceof PingActionContext; + } + + @Override + public void stopWatch(HealthCheckAction action) { + removeHealthStatus(action); + } + } + + protected class CollectorPsubActionListener implements PsubActionListener { + + @Override + public void onAction(PsubActionContext psubActionContext) { + HealthStatus healthStatus = createOrGetHealthStatus(psubActionContext.instance()); + if (!psubActionContext.getResult().isEmpty()) { + healthStatus.subSuccess(); + } + } + + @Override + public void stopWatch(HealthCheckAction action) { + removeHealthStatus(action); + } + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java new file mode 100644 index 000000000..ece7dd5ff --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java @@ -0,0 +1,100 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceDown; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceLoading; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceUp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * UNKNOWN + * pingSuccess -> INSTANCEUP + start subAction + * pingFail -> DOWN + markDown + * subSuccess -> throw exception + *

+ * INSTANCEUP + * pingSuccess,do nothing + * pingFail -> DOWN + markDown + * subSuccess -> HEALTHY + markUp + stop subAction + *

+ * HEALTHY + * pingSuccess,do nothing + * pingFail -> DOWN + markDown + * subSuccess -> throw exception + *

+ * DOWN + * pingSuccess -> INSTANCEUP + start subAction + * pingFail,do nothing + * subSuccess -> throw exception + */ +public class CrossRegionRedisHealthStatus extends HealthStatus { + + protected static final Logger logger = LoggerFactory.getLogger(CrossRegionRedisHealthStatus.class); + + public CrossRegionRedisHealthStatus(RedisHealthCheckInstance instance, ScheduledExecutorService scheduled) { + super(instance, scheduled); + } + + @Override + protected void loading() { + HEALTH_STATE preState = state.get(); + if(state.compareAndSet(preState, HEALTH_STATE.DOWN)) { + logStateChange(preState, state.get()); + } + if (!preState.equals(HEALTH_STATE.DOWN)) { + logger.info("[setLoading] {}", this); + notifyObservers(new InstanceLoading(instance)); + } + } + + @Override + protected void pong() { + lastPongTime.set(System.currentTimeMillis()); + HEALTH_STATE preState = state.get(); + if (preState.equals(HEALTH_STATE.UNKNOWN) || preState.equals(HEALTH_STATE.DOWN)) { + if(state.compareAndSet(preState, HEALTH_STATE.INSTANCEUP)) { + logStateChange(preState, state.get()); + } + } + } + + @Override + protected void subSuccess() { + HEALTH_STATE preState = state.get(); + if (preState.equals(HEALTH_STATE.INSTANCEUP)) { + if(state.compareAndSet(preState, HEALTH_STATE.HEALTHY)) { + logStateChange(preState, state.get()); + } + logger.info("[setUp] {}", this); + notifyObservers(new InstanceUp(instance)); + } + } + + @Override + protected void healthStatusUpdate() { + long currentTime = System.currentTimeMillis(); + + if(lastPongTime.get() != UNSET_TIME) { + long pingDownTime = currentTime - lastPongTime.get(); + final int pingDownAfter = pingDownAfterMilli.getAsInt(); + if (pingDownTime > pingDownAfter) { + doMarkDown(); + } + } + } + + protected void doMarkDown() { + HEALTH_STATE preState = state.get(); + if(state.compareAndSet(preState, HEALTH_STATE.DOWN)) { + logStateChange(preState, state.get()); + } + if (!preState.equals(HEALTH_STATE.DOWN)) { + logger.info("[setDown] {}", this); + notifyObservers(new InstanceDown(instance)); + } + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java new file mode 100644 index 000000000..ca6307946 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java @@ -0,0 +1,116 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.api.factory.ObjectFactory; +import com.ctrip.xpipe.api.observer.Observable; +import com.ctrip.xpipe.api.observer.Observer; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.AbstractInstanceEvent; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.HealthEventProcessor; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector; +import com.ctrip.xpipe.utils.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; +import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; + +@Component +public class DefaultPsubPingActionCollector extends AbstractPsubPingActionCollector implements PsubPingActionCollector, HealthStateService, OneWaySupport { + + private static final Logger logger = LoggerFactory.getLogger(DefaultPsubPingActionCollector.class); + + @Autowired + private List healthEventProcessors; + + @Resource(name = SCHEDULED_EXECUTOR) + private ScheduledExecutorService scheduled; + + @Resource(name = GLOBAL_EXECUTOR) + private ExecutorService executors; + + @Override + public HEALTH_STATE getHealthState(HostPort hostPort) { + RedisHealthCheckInstance key = allHealthStatus.keySet().stream() + .filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort)) + .findFirst().orElse(null); + + if (null != key) return allHealthStatus.get(key).getState(); + return HEALTH_STATE.UNKNOWN; + } + + @Override + public Map getAllCachedState() { + Map cachedHealthStatus = new HashMap<>(); + allHealthStatus.forEach(((instance, healthStatus) -> { + RedisInstanceInfo info = instance.getCheckInfo(); + cachedHealthStatus.put(info.getHostPort(), healthStatus.getState()); + })); + + return cachedHealthStatus; + } + + public Map getAllHealthStatus() { + Map cachedHealthStatus = new HashMap<>(); + allHealthStatus.forEach(((instance, healthStatus) -> { + HostPort hostPort = instance.getCheckInfo().getHostPort(); + cachedHealthStatus.put(hostPort, new HealthStatusDesc(hostPort, healthStatus)); + })); + + return cachedHealthStatus; + } + + @Override + public void updateHealthState(Map redisStates) { + throw new UnsupportedOperationException(); + } + + @Override + protected HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance) { + return MapUtils.getOrCreate(allHealthStatus, instance, new ObjectFactory() { + @Override + public HealthStatus create() { + + HealthStatus healthStatus = new CrossRegionRedisHealthStatus(instance, scheduled); + + healthStatus.addObserver(new Observer() { + @Override + public void update(Object args, Observable observable) { + onInstanceStateChange(args); + } + }); + healthStatus.start(); + return healthStatus; + } + }); + } + + private void onInstanceStateChange(Object args) { + + logger.info("[onInstanceStateChange]{}", args); + for (HealthEventProcessor processor : healthEventProcessors) { + + if (processor instanceof OneWaySupport) { + executors.execute(new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + processor.onEvent((AbstractInstanceEvent) args); + } + }); + } + } + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java index 7ad76d41b..be0421bfa 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java @@ -31,10 +31,10 @@ public class HealthStatus extends AbstractObservable implements Startable, Stopp public static long UNSET_TIME = -1L; - private AtomicLong lastPongTime = new AtomicLong(UNSET_TIME); + protected AtomicLong lastPongTime = new AtomicLong(UNSET_TIME); private AtomicLong lastHealthDelayTime = new AtomicLong(UNSET_TIME); - private AtomicReference state = new AtomicReference<>(HEALTH_STATE.UNKNOWN); + protected AtomicReference state = new AtomicReference<>(HEALTH_STATE.UNKNOWN); protected RedisHealthCheckInstance instance; protected final IntSupplier delayDownAfterMilli; @@ -110,7 +110,7 @@ protected boolean shouldNotRun() { return lastHealthDelayTime.get() < 0 && lastPongTime.get() < 0; } - void loading() { + protected void loading() { HEALTH_STATE preState = state.get(); if(preState.equals(preState.afterPingFail())) { return; @@ -124,7 +124,7 @@ void loading() { } } - void pong(){ + protected void pong(){ lastPongTime.set(System.currentTimeMillis()); setPingUp(); } @@ -135,6 +135,8 @@ void pongInit() { } } + protected void subSuccess(){} + void delay(long delayMilli, long...srcShardDbId){ //first time diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java new file mode 100644 index 000000000..bf78a0415 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java @@ -0,0 +1,202 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.api.lifecycle.Ordered; +import com.ctrip.xpipe.api.lifecycle.TopElement; +import com.ctrip.xpipe.api.monitor.Task; +import com.ctrip.xpipe.api.monitor.TransactionMonitor; +import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.lifecycle.AbstractLifecycle; +import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.OutClientInstanceHealthHolder; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.UpDownInstances; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; +import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder; +import com.ctrip.xpipe.redis.core.entity.ClusterMeta; +import com.ctrip.xpipe.redis.core.entity.DcMeta; +import com.ctrip.xpipe.redis.core.entity.ShardMeta; +import com.ctrip.xpipe.redis.core.entity.XpipeMeta; +import com.ctrip.xpipe.redis.core.exception.MasterNotFoundException; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import com.ctrip.xpipe.tuple.Pair; +import com.ctrip.xpipe.utils.MapUtils; +import com.ctrip.xpipe.utils.XpipeThreadFactory; +import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.lang.Nullable; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; + +@Service +public class InstanceCrossRegionHealthStatusConsistenceInspector extends AbstractLifecycle implements TopElement { + private InstanceHealthStatusCollector collector; + + private InstanceStatusAdjuster adjuster; + + private StabilityHolder siteStability; + + private CheckerConfig config; + + private MetaCache metaCache; + + private GroupCheckerLeaderElector leaderElector; + + private DynamicDelayPeriodTask task; + + private ScheduledExecutorService scheduled; + + private static final Logger logger = LoggerFactory.getLogger(InstanceCrossRegionHealthStatusConsistenceInspector.class); + + private static final String currentDc = FoundationService.DEFAULT.getDataCenter(); + + private static final String TYPE = "HealthCheck"; + + @Autowired + public InstanceCrossRegionHealthStatusConsistenceInspector(InstanceHealthStatusCollector instanceHealthStatusCollector, + InstanceStatusAdjuster instanceStatusAdjuster, + @Nullable GroupCheckerLeaderElector groupCheckerLeaderElector, + StabilityHolder stabilityHolder, CheckerConfig checkerConfig, + MetaCache metaCache) { + this.collector = instanceHealthStatusCollector; + this.adjuster = instanceStatusAdjuster; + this.leaderElector = groupCheckerLeaderElector; + this.siteStability = stabilityHolder; + this.config = checkerConfig; + this.metaCache = metaCache; + } + + protected void inspectCurrentDc() { + if(!siteStability.isSiteStable()) { + logger.debug("[inspectCrossRegion][skip] unstable"); + return; + } + if (null == leaderElector || !leaderElector.amILeader()) { + logger.debug("[inspectCrossRegion][skip] not leader"); + return; + } + + logger.debug("[inspectCrossRegion] begin"); + final long timeoutMill = System.currentTimeMillis() + config.getPingDownAfterMilli() / 2; + TransactionMonitor.DEFAULT.logTransactionSwallowException(TYPE, "compensator.inspect.crossRegion", new Task() { + @Override + public void go() throws Exception { + Map> interestedCurrentDc = fetchInterestedCurrentDcClusterInstances(); + if (interestedCurrentDc.isEmpty()) { + logger.debug("[inspectCrossRegion][skip] no interested instance"); + return; + } + + Pair instanceHealth = collector.collect(true); + checkTimeout(timeoutMill, "after collect"); + + XPipeInstanceHealthHolder xpipeInstanceHealth = instanceHealth.getKey(); + OutClientInstanceHealthHolder outClientInstanceHealth = instanceHealth.getValue(); + UpDownInstances hostPortNeedAdjustForPingAction = findHostPortNeedAdjust(xpipeInstanceHealth, outClientInstanceHealth, interestedCurrentDc); + + checkTimeout(timeoutMill, "after compare"); + if (!hostPortNeedAdjustForPingAction.getHealthyInstances().isEmpty()) + adjuster.adjustInstances(hostPortNeedAdjustForPingAction.getHealthyInstances(), true, true, timeoutMill); + + checkTimeout(timeoutMill, "after adjust up"); + if (!hostPortNeedAdjustForPingAction.getUnhealthyInstances().isEmpty()) + adjuster.adjustInstances(hostPortNeedAdjustForPingAction.getUnhealthyInstances(), true, false, timeoutMill); + } + + @Override + public Map getData() { + return Collections.singletonMap("timeoutMilli", timeoutMill); + } + }); + } + + private void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { + if (System.currentTimeMillis() > timeoutAtMilli) { + logger.info("[timeout] {}", msg); + throw new TimeoutException(msg); + } + } + + protected Map> fetchInterestedCurrentDcClusterInstances() { + Map> interestedCurrentDcClusterInstances = new HashMap<>(); + XpipeMeta xpipeMeta = metaCache.getXpipeMeta(); + if (null == xpipeMeta) return interestedCurrentDcClusterInstances; + + for (DcMeta dcMeta: xpipeMeta.getDcs().values()) { + if (!dcMeta.getId().equalsIgnoreCase(currentDc)) continue; + + for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { + if (!ClusterType.isSameClusterType(clusterMeta.getType(), ClusterType.ONE_WAY)) continue; + if (!metaCache.isCrossRegion(dcMeta.getId(), clusterMeta.getActiveDc())) continue; + + Set interestedInstances = MapUtils.getOrCreate(interestedCurrentDcClusterInstances, clusterMeta.getId(), HashSet::new); + for (ShardMeta shardMeta: clusterMeta.getShards().values()) { + shardMeta.getRedises().forEach(redis -> interestedInstances.add(new HostPort(redis.getIp(), redis.getPort()))); + } + } + } + + return interestedCurrentDcClusterInstances; + } + + protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipeInstanceHealthHolder, + OutClientInstanceHealthHolder outClientInstanceHealthHolder, + Map> interested) { + int quorum = config.getQuorum(); + UpDownInstances xpipeInstances = xpipeInstanceHealthHolder.aggregate(interested, quorum); + UpDownInstances outClientInstances = outClientInstanceHealthHolder.extractReadable(interested); + + Set needMarkUpInstances = xpipeInstances.getHealthyInstances(); + Set needMarkDownInstances = xpipeInstances.getUnhealthyInstances(); + needMarkUpInstances.retainAll(outClientInstances.getUnhealthyInstances()); + needMarkDownInstances.retainAll(outClientInstances.getHealthyInstances()); + logger.info("[InstanceCrossRegionHealthStatusConsistenceInspector] needMarkUpInstances:{}", needMarkUpInstances); + logger.info("[InstanceCrossRegionHealthStatusConsistenceInspector] needMarkDownInstances:{}", needMarkDownInstances); + return new UpDownInstances(needMarkUpInstances, needMarkDownInstances); + } + + @Override + protected void doInitialize() throws Exception { + super.doInitialize(); + this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("InstanceCrossRegionHealthStatusConsistenceInspector")); + this.task = new DynamicDelayPeriodTask("inspectCrossRegion", new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + inspectCurrentDc(); + } + }, config::getHealthMarkCompensateIntervalMill, scheduled); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + this.task.start(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + this.task.stop(); + } + + @Override + protected void doDispose() throws Exception { + super.doDispose(); + this.scheduled.shutdown(); + this.scheduled = null; + this.task = null; + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java index b401c881a..058e87aba 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java @@ -7,6 +7,7 @@ import com.ctrip.xpipe.redis.checker.CheckerService; import com.ctrip.xpipe.redis.checker.OuterClientCache; import com.ctrip.xpipe.redis.checker.RemoteCheckerManager; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.OutClientInstanceHealthHolder; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; @@ -48,27 +49,31 @@ public InstanceHealthStatusCollector(RemoteCheckerManager remoteCheckerManager, this.executors = executors; } - public Pair collect() + public Pair collect() throws ExecutionException, InterruptedException, TimeoutException { + return collect(false); + } + + public Pair collect(boolean isCrossRegion) throws InterruptedException, ExecutionException, TimeoutException { XPipeInstanceHealthHolder xpipeInstanceHealthHolder = new XPipeInstanceHealthHolder(); OutClientInstanceHealthHolder outClientInstanceHealthHolder = new OutClientInstanceHealthHolder(); ParallelCommandChain commandChain = new ParallelCommandChain(executors); - commandChain.add(new GetAllOutClientInstanceStatusCmd(outClientInstanceHealthHolder)); + commandChain.add(new GetAllOutClientInstanceStatusCmd(outClientInstanceHealthHolder, isCrossRegion)); remoteCheckerManager.getAllCheckerServices().forEach(checkerService -> { - commandChain.add(new GetRemoteCheckResultCmd(checkerService, xpipeInstanceHealthHolder)); + commandChain.add(new GetRemoteCheckResultCmd(checkerService, xpipeInstanceHealthHolder, isCrossRegion)); }); commandChain.execute().get(5, TimeUnit.SECONDS); return new Pair<>(xpipeInstanceHealthHolder, outClientInstanceHealthHolder); } - public XPipeInstanceHealthHolder collectXPipeInstanceHealth(HostPort hostPort) + public XPipeInstanceHealthHolder collectXPipeInstanceHealth(HostPort hostPort, boolean isCrossRegion) throws InterruptedException, ExecutionException, TimeoutException { ParallelCommandChain commandChain = new ParallelCommandChain(executors); XPipeInstanceHealthHolder xpipeInstanceHealthHolder = new XPipeInstanceHealthHolder(); remoteCheckerManager.getAllCheckerServices().forEach(checkerService -> { - commandChain.add(new GetRemoteHealthStateCmd(hostPort, checkerService, xpipeInstanceHealthHolder)); + commandChain.add(new GetRemoteHealthStateCmd(hostPort, checkerService, xpipeInstanceHealthHolder, isCrossRegion)); }); commandChain.execute().get(5, TimeUnit.SECONDS); @@ -81,15 +86,22 @@ private class GetRemoteCheckResultCmd extends AbstractCommand { private XPipeInstanceHealthHolder resultHolder; - public GetRemoteCheckResultCmd(CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder) { + private boolean isCrossRegion; + + public GetRemoteCheckResultCmd(CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder, boolean isCrossRegion) { this.checkerService = checkerService; this.resultHolder = xpipeInstanceHealthHolder; + this.isCrossRegion = isCrossRegion; } @Override protected void doExecute() throws Throwable { try { - resultHolder.add(checkerService.getAllInstanceHealthStatus()); + if (isCrossRegion) { + resultHolder.add(checkerService.getAllInstanceCrossRegionHealthStatus()); + } else { + resultHolder.add(checkerService.getAllInstanceHealthStatus()); + } } catch (RestClientException restException) { logger.info("[doExecute][rest fail] {}", restException.getMessage()); } catch (Throwable th) { @@ -114,15 +126,23 @@ private class GetAllOutClientInstanceStatusCmd extends AbstractCommand { private OutClientInstanceHealthHolder resultHolder; - public GetAllOutClientInstanceStatusCmd(OutClientInstanceHealthHolder outClientInstanceHealthHolder) { + private boolean isCrossRegion; + + public GetAllOutClientInstanceStatusCmd(OutClientInstanceHealthHolder outClientInstanceHealthHolder, boolean isCrossRegion) { this.resultHolder = outClientInstanceHealthHolder; + this.isCrossRegion = isCrossRegion; } @Override protected void doExecute() throws Throwable { try { - resultHolder.addClusters( - outerClientCache.getAllActiveDcClusters(FoundationService.DEFAULT.getDataCenter())); + if (isCrossRegion) { + resultHolder.addClusters( + outerClientCache.getAllCurrentDcClusters(FoundationService.DEFAULT.getDataCenter())); + } else { + resultHolder.addClusters( + outerClientCache.getAllDcClusters(FoundationService.DEFAULT.getDataCenter())); + } } catch (Throwable th) { logger.info("[doExecute][fail]", th); } @@ -149,16 +169,25 @@ private class GetRemoteHealthStateCmd extends AbstractCommand { private XPipeInstanceHealthHolder resultHolder; - public GetRemoteHealthStateCmd(HostPort hostPort, CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder) { + private boolean isCrossRegion; + + public GetRemoteHealthStateCmd(HostPort hostPort, CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder, boolean isCrossRegion) { this.hostPort = hostPort; this.checkerService = checkerService; this.resultHolder = xpipeInstanceHealthHolder; + this.isCrossRegion = isCrossRegion; } @Override protected void doExecute() throws Throwable { try { - resultHolder.add(new HealthStatusDesc(hostPort, checkerService.getInstanceStatus(hostPort.getHost(), hostPort.getPort()))); + HEALTH_STATE status; + if (isCrossRegion) { + status = checkerService.getCrossRegionInstanceStatus(hostPort.getHost(), hostPort.getPort()); + } else { + status = checkerService.getInstanceStatus(hostPort.getHost(), hostPort.getPort()); + } + resultHolder.add(new HealthStatusDesc(hostPort, status)); } catch (RestClientException restException) { logger.info("[doExecute][rest fail] {}", restException.getMessage()); } catch (Throwable th) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java index 1dfa71b06..93c9a072b 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java @@ -49,17 +49,17 @@ @Service public class InstanceHealthStatusConsistenceInspector extends AbstractLifecycle implements TopElement { - private InstanceHealthStatusCollector collector; + protected InstanceHealthStatusCollector collector; - private InstanceStatusAdjuster adjuster; + protected InstanceStatusAdjuster adjuster; - private StabilityHolder siteStability; + protected StabilityHolder siteStability; - private CheckerConfig config; + protected CheckerConfig config; - private MetaCache metaCache; + protected MetaCache metaCache; - private GroupCheckerLeaderElector leaderElector; + protected GroupCheckerLeaderElector leaderElector; private DynamicDelayPeriodTask task; @@ -124,11 +124,11 @@ public void go() throws Exception { checkTimeout(timeoutMill, "after compare"); if (!instanceNeedAdjust.getHealthyInstances().isEmpty()) - adjuster.adjustInstances(instanceNeedAdjust.getHealthyInstances(), true, timeoutMill); + adjuster.adjustInstances(instanceNeedAdjust.getHealthyInstances(), false, true, timeoutMill); checkTimeout(timeoutMill, "after adjust up"); if (!instanceNeedAdjust.getUnhealthyInstances().isEmpty()) - adjuster.adjustInstances(instanceNeedAdjust.getUnhealthyInstances(), false, timeoutMill); + adjuster.adjustInstances(instanceNeedAdjust.getUnhealthyInstances(), false, false, timeoutMill); } @Override @@ -138,7 +138,7 @@ public Map getData() { }); } - private void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { + protected void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { if (System.currentTimeMillis() > timeoutAtMilli) { logger.info("[timeout] {}", msg); throw new TimeoutException(msg); @@ -157,6 +157,7 @@ protected Map> fetchInterestedClusterInstances() { for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { if (!ClusterType.isSameClusterType(clusterMeta.getType(), ClusterType.ONE_WAY)) continue; + if (metaCache.isCrossRegion(dcMeta.getId(), clusterMeta.getActiveDc())) continue;; if (!clusterMeta.getActiveDc().equalsIgnoreCase(currentDc)) continue; Set interestedInstances = MapUtils.getOrCreate(interestedClusterInstances, clusterMeta.getId(), HashSet::new); @@ -181,7 +182,8 @@ protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipe needMarkUpInstances.retainAll(outClientInstances.getUnhealthyInstances()); needMarkDownInstances.retainAll(outClientInstances.getHealthyInstances()); needMarkDownInstances = filterMasterHealthyInstances(xpipeInstanceHealthHolder, needMarkDownInstances, quorum); - needMarkDownInstances = filterMarkDowUnsupportedInstances(needMarkDownInstances); + logger.info("[InstanceHealthStatusConsistenceInspector] needMarkUpInstances:{}", needMarkUpInstances); + logger.info("[InstanceHealthStatusConsistenceInspector] needMarkDownInstances:{}", needMarkDownInstances); return new UpDownInstances(needMarkUpInstances, needMarkDownInstances); } @@ -212,33 +214,6 @@ protected Set filterMasterHealthyInstances(XPipeInstanceHealthHolder x return masterHealthyInstances; } - protected Set filterMarkDowUnsupportedInstances(Set instances) { - Set wontMarkdownInstances = new HashSet<>(); - for (HostPort instance : instances) { - HEALTH_STATE healthState = defaultDelayPingActionCollector.getState(instance); - if (healthState.equals(HEALTH_STATE.SICK)) { - if (!shouldMarkdownDcClusterSickInstances(healthCheckInstanceManager.findRedisHealthCheckInstance(instance))) - wontMarkdownInstances.add(instance); - } - } - instances.removeAll(wontMarkdownInstances); - return instances; - } - - boolean shouldMarkdownDcClusterSickInstances(RedisHealthCheckInstance healthCheckInstance) { - RedisInstanceInfo info = healthCheckInstance.getCheckInfo(); - if (info.isCrossRegion()) { - logger.info("[markdown][{} is cross region, do not call client service ]", info.getHostPort()); - return false; - } - if (healthCheckInstance.getHealthCheckConfig().getDelayConfig(info.getClusterId(), currentDc, info.getDcId()).getClusterLevelHealthyDelayMilli() < 0) { - logger.info("[markdown][cluster {} dcs {}->{} distance is -1, do not call client service ]", info.getClusterId(), currentDc, info.getDcId()); - return false; - } - return true; - } - - @Override protected void doInitialize() throws Exception { super.doInitialize(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommand.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommand.java index f8eef240b..f064c2067 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommand.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommand.java @@ -22,6 +22,8 @@ public class InstanceStatusAdjustCommand extends AbstractCommand { private ClusterShardHostPort instance; + private boolean isCrossRegion; + private InstanceHealthStatusCollector collector; private OuterClientService outerClientService; @@ -40,11 +42,12 @@ public class InstanceStatusAdjustCommand extends AbstractCommand { private static final Logger logger = LoggerFactory.getLogger(InstanceStatusAdjustCommand.class); - public InstanceStatusAdjustCommand(ClusterShardHostPort instance, InstanceHealthStatusCollector collector, + public InstanceStatusAdjustCommand(ClusterShardHostPort instance, boolean isCrossRegion, InstanceHealthStatusCollector collector, OuterClientService outerClientService, boolean state, long deadlineTimeMilli, StabilityHolder stabilityHolder, CheckerConfig config, MetaCache metaCache, AlertManager alertManager) { this.instance = instance; + this.isCrossRegion = isCrossRegion; this.collector = collector; this.outerClientService = outerClientService; this.state = state; @@ -74,7 +77,7 @@ protected void doExecute() throws Throwable { return; } Boolean xpipeHealthState = - collector.collectXPipeInstanceHealth(instance.getHostPort()).aggregate(instance.getHostPort(), config.getQuorum()); + collector.collectXPipeInstanceHealth(instance.getHostPort(), isCrossRegion).aggregate(instance.getHostPort(), config.getQuorum()); if (null == xpipeHealthState || !xpipeHealthState.equals(state)) { logger.info("[compensate][skip][xpipe state change] {}->{} {}", state, xpipeHealthState, instance); future().setSuccess(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjuster.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjuster.java index 5f242bca6..647e936bb 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjuster.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjuster.java @@ -50,10 +50,10 @@ public InstanceStatusAdjuster(InstanceHealthStatusCollector collector, AlertMana this.executors = executorFactory.createExecutorService(); } - public void adjustInstances(Set instances, boolean state, long timeoutAtMilli) { + public void adjustInstances(Set instances, boolean isCrossRegion, boolean state, long timeoutAtMilli) { for (HostPort instance: instances) { Pair clusterShard = metaCache.findClusterShard(instance); - new InstanceStatusAdjustCommand(new ClusterShardHostPort(clusterShard.getKey(), clusterShard.getValue(), instance), + new InstanceStatusAdjustCommand(new ClusterShardHostPort(clusterShard.getKey(), clusterShard.getValue(), instance), isCrossRegion, collector, OuterClientService.DEFAULT, state, timeoutAtMilli, siteStability, config, metaCache, alertManager).execute(executors); } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java index 0d6942e4c..b61907c86 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.handler; +import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.concurrent.FinalStateSetterManager; import com.ctrip.xpipe.endpoint.ClusterShardHostPort; import com.ctrip.xpipe.endpoint.HostPort; @@ -48,6 +49,8 @@ public abstract class AbstractHealthEventHandler listeners; @@ -45,17 +51,25 @@ public class PingActionFactory implements RedisHealthCheckActionFactory delayPingCollectors; + @Autowired + private List psubPingActionCollectors; + private Map> controllersByClusterType; private Map> listenerByClusterType; private Map> delayPingCollectorsByClusterType; + private Map> psubPingActionCollectorsByClusterType; + + protected static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); + @PostConstruct public void postConstruct() { controllersByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(controllers); listenerByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(listeners); delayPingCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(delayPingCollectors); + psubPingActionCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(psubPingActionCollectors); } @Override @@ -63,15 +77,23 @@ public PingAction create(RedisHealthCheckInstance instance) { PingAction pingAction = new PingAction(scheduled, instance, executors); ClusterType clusterType = instance.getCheckInfo().getClusterType(); - pingAction.addListeners(listenerByClusterType.get(clusterType)); pingAction.addControllers(controllersByClusterType.get(clusterType)); - if(instance instanceof DefaultRedisHealthCheckInstance) { - pingAction.addListener(((DefaultRedisHealthCheckInstance)instance).createPingListener()); + + if (currentDcId.equalsIgnoreCase(instance.getCheckInfo().getActiveDc())) { + pingAction.addListeners(listenerByClusterType.get(clusterType)); + if(instance instanceof DefaultRedisHealthCheckInstance) { + pingAction.addListener(((DefaultRedisHealthCheckInstance)instance).createPingListener()); + } + delayPingCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); + }); } - delayPingCollectorsByClusterType.get(clusterType).forEach(collector -> { - if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); - }); + if (metaCache.isCrossRegion(currentDcId, instance.getCheckInfo().getActiveDc())) { + psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); + }); + } return pingAction; } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java new file mode 100644 index 000000000..e5d32397a --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java @@ -0,0 +1,24 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class OneWayPsubActionController implements PsubActionController, OneWaySupport { + + @Autowired + private MetaCache metaCache; + + private static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); + + @Override + public boolean shouldCheck(RedisHealthCheckInstance instance) { + RedisInstanceInfo info = instance.getCheckInfo(); + return metaCache.isCrossRegion(currentDcId, info.getActiveDc()) && currentDcId.equalsIgnoreCase(info.getDcId()); + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java new file mode 100644 index 000000000..7bf07be14 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java @@ -0,0 +1,50 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.AbstractHealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +public class PsubAction extends AbstractHealthCheckAction { + + protected static final Logger logger = LoggerFactory.getLogger(PsubAction.class); + + private String[] pubSubChannelPrefix; + + public PsubAction(ScheduledExecutorService scheduled, RedisHealthCheckInstance instance, ExecutorService executors) { + super(scheduled, instance, executors); + this.pubSubChannelPrefix = new String[]{"xpipe*"}; + } + + @Override + protected void doTask() { + RedisSession session = instance.getRedisSession(); + doPSubscribe(session, new RedisSession.SubscribeCallback() { + @Override + public void message(String channel, String message) { + logger.debug("[PsubAction][{}]success, channel:{}, message : {}", instance.getEndpoint(), channel, message); + notifyListeners(new PsubActionContext(instance, message)); + } + + @Override + public void fail(Throwable e) { + logger.error("[PsubAction][{}] fail", instance.getEndpoint(), e); + //ignore psub fail + } + }, pubSubChannelPrefix); + } + + @Override + protected Logger getHealthCheckLogger() { + return logger; + } + + protected void doPSubscribe(RedisSession session, RedisSession.SubscribeCallback callback, String... channel) { + session.psubscribeIfAbsent(callback, channel); + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java new file mode 100644 index 000000000..017aa6d4d --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java @@ -0,0 +1,10 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.AbstractActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public class PsubActionContext extends AbstractActionContext { + public PsubActionContext(RedisHealthCheckInstance instance, String s) { + super(instance, s); + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java new file mode 100644 index 000000000..c5e38e723 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java @@ -0,0 +1,7 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckActionController; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public interface PsubActionController extends HealthCheckActionController { +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java new file mode 100644 index 000000000..891491ae0 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java @@ -0,0 +1,71 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckActionFactory; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; +import com.ctrip.xpipe.redis.checker.healthcheck.util.ClusterTypeSupporterSeparator; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static com.ctrip.xpipe.redis.checker.resource.Resource.PING_DELAY_INFO_EXECUTORS; +import static com.ctrip.xpipe.redis.checker.resource.Resource.PING_DELAY_INFO_SCHEDULED; + +@Component +public class PsubActionFactory implements RedisHealthCheckActionFactory, OneWaySupport { + + @Autowired + private MetaCache metaCache; + + @Resource(name = PING_DELAY_INFO_SCHEDULED) + private ScheduledExecutorService scheduled; + + @Resource(name = PING_DELAY_INFO_EXECUTORS) + private ExecutorService executors; + + @Autowired + private List controllers; + + @Autowired + private List psubPingActionCollectors; + + private Map> controllersByClusterType; + + private Map> psubPingActionCollectorsByClusterType; + + private static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); + + @PostConstruct + public void postConstruct() { + controllersByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(controllers); + psubPingActionCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(psubPingActionCollectors); + } + + @Override + public PsubAction create(RedisHealthCheckInstance instance) { + PsubAction psubAction = new PsubAction(scheduled, instance, executors); + ClusterType clusterType = instance.getCheckInfo().getClusterType(); + + psubAction.addControllers(controllersByClusterType.get(clusterType)); + psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) psubAction.addListener(collector.createPsubActionListener()); + }); + return psubAction; + } + + @Override + public boolean supportInstnace(RedisHealthCheckInstance instance) { + RedisInstanceInfo info = instance.getCheckInfo(); + return metaCache.isCrossRegion(currentDcId, info.getActiveDc()) && currentDcId.equalsIgnoreCase(info.getDcId()); + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java new file mode 100644 index 000000000..59a19d494 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java @@ -0,0 +1,15 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public interface PsubActionListener extends HealthCheckActionListener> { + + @Override + default boolean worksfor(ActionContext t) { + return t instanceof PsubActionContext; + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java new file mode 100644 index 000000000..17cf8a6e6 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java @@ -0,0 +1,14 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener; + +public interface PsubPingActionCollector { + + boolean supportInstance(RedisHealthCheckInstance instance); + + PingActionListener createPingActionListener(); + + PsubActionListener createPsubActionListener(); + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java index 9bfe46e78..53b8c2ee2 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.KeeperInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.actions.delay.DelayConfig; +import com.ctrip.xpipe.redis.core.meta.MetaCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,9 +21,9 @@ public class CompositeHealthCheckConfig implements HealthCheckConfig { private HealthCheckConfig config; - public CompositeHealthCheckConfig(RedisInstanceInfo instanceInfo, CheckerConfig checkerConfig, DcRelationsService dcRelationsService) { + public CompositeHealthCheckConfig(RedisInstanceInfo instanceInfo, CheckerConfig checkerConfig, DcRelationsService dcRelationsService, boolean isCrossRegion) { logger.info("[CompositeHealthCheckConfig] {}", instanceInfo); - if(instanceInfo.isCrossRegion()) { + if(isCrossRegion) { config = new ProxyEnabledHealthCheckConfig(checkerConfig, dcRelationsService); logger.info("[CompositeHealthCheckConfig][proxied] ping down time: {}", config.pingDownAfterMilli()); } else { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java index 665b0270a..d5022fb3e 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java @@ -10,6 +10,9 @@ import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.*; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionFactory; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubAction; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionFactory; import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.RedisCheckRule; import com.ctrip.xpipe.redis.checker.healthcheck.config.CompositeHealthCheckConfig; import com.ctrip.xpipe.redis.checker.healthcheck.config.DefaultHealthCheckConfig; @@ -124,7 +127,7 @@ public RedisHealthCheckInstance create(RedisMeta redisMeta) { RedisInstanceInfo info = createRedisInstanceInfo(redisMeta); Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redisMeta); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); instance.setEndpoint(endpoint) .setSession(redisSessionManager.findOrCreateSession(endpoint)) @@ -228,7 +231,7 @@ public RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta r DefaultRedisHealthCheckInstance instance = new DefaultRedisHealthCheckInstance(); RedisInstanceInfo info = createRedisInstanceInfo(redis); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redis); instance.setEndpoint(endpoint) @@ -242,6 +245,25 @@ public RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta r return instance; } + @Override + public RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis) { + DefaultRedisHealthCheckInstance instance = new DefaultRedisHealthCheckInstance(); + + RedisInstanceInfo info = createRedisInstanceInfo(redis); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); + Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redis); + + instance.setEndpoint(endpoint) + .setSession(redisSessionManager.findOrCreateSession(endpoint)) + .setInstanceInfo(info) + .setHealthCheckConfig(config); + + initActionsForRedisForPsubPingAction(instance); + startCheck(instance); + + return instance; + } + private void initActionsForRedisForAssignedAction(DefaultRedisHealthCheckInstance instance) { for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { if (factory instanceof KeeperSupport) @@ -249,6 +271,13 @@ private void initActionsForRedisForAssignedAction(DefaultRedisHealthCheckInstanc } } + private void initActionsForRedisForPsubPingAction(DefaultRedisHealthCheckInstance instance) { + for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { + if (factory instanceof PingActionFactory || factory instanceof PsubActionFactory) + initActions(instance, factory); + } + } + @SuppressWarnings("unchecked") private void initActions(DefaultRedisHealthCheckInstance instance) { for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java index 390ecbc75..9cfb58e89 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java @@ -35,6 +35,8 @@ public class DefaultHealthCheckInstanceManager implements HealthCheckInstanceMan private ConcurrentMap redisInstanceForAssignedAction = Maps.newConcurrentMap(); + private ConcurrentMap redisInstanceForPingAction = Maps.newConcurrentMap(); + @Autowired private HealthCheckInstanceFactory instanceFactory; @@ -61,6 +63,18 @@ public RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisM return null; } + @Override + public RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis) { + try { + HostPort key = new HostPort(redis.getIp(), redis.getPort()); + return MapUtils.getOrCreate(redisInstanceForPingAction, key, + () -> instanceFactory.getOrCreateRedisInstanceForPsubPingAction(redis)); + } catch (Throwable th) { + logger.error("getOrCreate ping action health check redis instance:{}:{}", redis.getIp(), redis.getPort()); + } + return null; + } + @Override public KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper) { try { @@ -93,6 +107,11 @@ public RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort host return redisInstanceForAssignedAction.get(hostPort); } + @Override + public RedisHealthCheckInstance findRedisInstanceForPsubPingAction(HostPort hostPort) { + return redisInstanceForPingAction.get(hostPort); + } + @Override public KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort) { return keeperInstances.get(hostPort); @@ -125,6 +144,13 @@ public RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort ho return instance; } + @Override + public RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort) { + RedisHealthCheckInstance instance = redisInstanceForPingAction.remove(hostPort); + if (null != instance) instanceFactory.remove(instance); + return instance; + } + @Override public ClusterHealthCheckInstance remove(String cluster) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java index 4f0b23adb..ea8f4a7b3 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.core.meta.comparator.KeeperContainerMetaComparator.getAllKeeperContainerDetailInfoFromDcMeta; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @@ -146,6 +145,10 @@ void generateHealthCheckInstances() { generateHealthCheckInstances(cluster); } + if (clusterType == ClusterType.ONE_WAY && isClusterActiveDcCrossRegion(cluster) && clusterDcIsCurrentDc(cluster)) { + generatePsubPingActionHealthCheckInstances(cluster); + } + } } } @@ -168,6 +171,14 @@ void generateHealthCheckInstances(ClusterMeta clusterMeta){ instanceManager.getOrCreate(clusterMeta); } + void generatePsubPingActionHealthCheckInstances(ClusterMeta clusterMeta){ + for(ShardMeta shard : clusterMeta.getShards().values()) { + for(RedisMeta redis : shard.getRedises()) { + instanceManager.getOrCreateRedisInstanceForPsubPingAction(redis); + } + } + } + private boolean isClusterActiveIdcCurrentIdc(ClusterMeta cluster) { return cluster.getActiveDc().equalsIgnoreCase(currentDcId); @@ -206,4 +217,8 @@ private boolean hasMultipleActiveDcs(ClusterType clusterType) { return clusterType.supportMultiActiveDC() && !clusterType.isCrossDc(); } + private boolean isClusterActiveDcCrossRegion(ClusterMeta clusterMeta) { + return metaCache.isCrossRegion(currentDcId, clusterMeta.getActiveDc()); + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java index 0b82cb610..4a89b794e 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java @@ -27,4 +27,6 @@ public interface HealthCheckInstanceFactory { void remove(ClusterHealthCheckInstance instance); RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta redis); + + RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java index 796505132..9853f7c22 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java @@ -9,6 +9,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager; import com.ctrip.xpipe.redis.checker.healthcheck.impl.HealthCheckEndpointFactory; import com.ctrip.xpipe.redis.core.entity.*; +import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.redis.core.meta.MetaComparator; import com.ctrip.xpipe.redis.core.meta.MetaComparatorVisitor; import com.ctrip.xpipe.redis.core.meta.comparator.ClusterMetaComparator; @@ -48,6 +49,8 @@ public class DefaultDcMetaChangeManager extends AbstractStartStoppable implement private CheckerConfig checkerConfig; + private MetaCache metaCache; + private final String dcId; private final List clustersToDelete = new ArrayList<>(); @@ -58,12 +61,14 @@ public class DefaultDcMetaChangeManager extends AbstractStartStoppable implement public DefaultDcMetaChangeManager(String dcId, HealthCheckInstanceManager instanceManager, HealthCheckEndpointFactory healthCheckEndpointFactory, CheckerConsoleService checkerConsoleService, - CheckerConfig checkerConfig) { + CheckerConfig checkerConfig, + MetaCache metaCache) { this.dcId = dcId; this.instanceManager = instanceManager; this.healthCheckEndpointFactory = healthCheckEndpointFactory; this.checkerConsoleService = checkerConsoleService; this.checkerConfig = checkerConfig; + this.metaCache = metaCache; } @Override @@ -104,10 +109,12 @@ public void compare(DcMeta future, DcMeta allFutureDcMeta) { private void removeAndAdd() { this.redisListToDelete.forEach(this::removeRedis); + this.redisListToDelete.forEach(this::removeRedisOnlyForPingAction); this.clustersToDelete.forEach(this::removeCluster); this.clustersToAdd.forEach(this::addCluster); this.redisListToAdd.forEach(this::addRedis); + this.redisListToAdd.forEach(this::addRedisOnlyForPingAction); } private void clearUp() { @@ -125,19 +132,24 @@ private void removeCluster(ClusterMeta removed) { logger.info("[removeCluster][{}][{}] remove health check", dcId, removed.getId()); ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(removeConsumer))); + ClusterMetaVisitor clusterMetaPingActionVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(removePingActionConsumer))); clusterMetaVisitor.accept(removed); + clusterMetaPingActionVisitor.accept(removed); } private void addCluster(ClusterMeta added) { - if (!isInterestedInCluster(added)) { - logger.info("[addCluster][{}][skip] cluster not interested", added.getId()); - return; + if (isInterestedInCluster(added)) { + logger.info("[addCluster][{}][{}] add health check", dcId, added.getId()); + instanceManager.getOrCreate(added); + ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addConsumer))); + clusterMetaVisitor.accept(added); + } + + if (isOneWayClusterActiveDcCrossRegionAndCurrentDc(added)) { + ClusterMetaVisitor clusterMetaPingActionVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addPingActionConsumer))); + clusterMetaPingActionVisitor.accept(added); } - logger.info("[addCluster][{}][{}] add health check", dcId, added.getId()); - instanceManager.getOrCreate(added); - ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addConsumer))); - clusterMetaVisitor.accept(added); } private void removeRedis(RedisMeta removed) { @@ -199,6 +211,11 @@ protected boolean isInterestedInCluster(ClusterMeta cluster) { return result; } + private boolean isOneWayClusterActiveDcCrossRegionAndCurrentDc(ClusterMeta cluster) { + ClusterType clusterType = ClusterType.lookup(cluster.getType()); + return clusterType == ClusterType.ONE_WAY && isClusterActiveDcCrossRegion(cluster) && clusterDcIsCurrentDc(cluster); + } + private boolean clusterDcIsCurrentDc(ClusterMeta clusterMeta) { return clusterMeta.parent().getId().equalsIgnoreCase(currentDcId); } @@ -220,6 +237,10 @@ private boolean hasMultipleActiveDcs(ClusterType clusterType) { return clusterType.supportMultiActiveDC() && !clusterType.isCrossDc(); } + private boolean isClusterActiveDcCrossRegion(ClusterMeta clusterMeta) { + return metaCache.isCrossRegion(currentDcId, clusterMeta.getActiveDc()); + } + private Consumer removeConsumer = new Consumer() { @Override public void accept(RedisMeta redisMeta) { @@ -234,6 +255,20 @@ public void accept(RedisMeta redisMeta) { } }; + private Consumer removePingActionConsumer = new Consumer() { + @Override + public void accept(RedisMeta redisMeta) { + removeRedisOnlyForPingAction(redisMeta); + } + }; + + private Consumer addPingActionConsumer = new Consumer() { + @Override + public void accept(RedisMeta redisMeta) { + addRedisOnlyForPingAction(redisMeta); + } + }; + @Override protected void doStart() { if (current == null) { @@ -276,6 +311,20 @@ private void addRedisOnlyForUsedMemory(RedisMeta added) { instanceManager.getOrCreateRedisInstanceForAssignedAction(added); } + private void removeRedisOnlyForPingAction(RedisMeta removed) { + if (null != instanceManager.removeRedisInstanceForPingAction(new HostPort(removed.getIp(), removed.getPort()))) { + logger.info("[removeRedisOnlyForPingAction][{}:{}] {}", removed.getIp(), removed.getPort(), removed); + } + } + + private void addRedisOnlyForPingAction(RedisMeta added) { + if (!isOneWayClusterActiveDcCrossRegionAndCurrentDc(added.parent().parent())) { + return; + } + logger.info("[addRedisOnlyForPingAction][{}:{}] {}", added.getIp(), added.getPort(), added); + instanceManager.getOrCreateRedisInstanceForPsubPingAction(added); + } + private class KeeperContainerMetaComparatorVisitor implements MetaComparatorVisitor { @Override diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java index 82a349e89..82655e604 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java @@ -97,7 +97,7 @@ public DcMetaChangeManager getOrCreate(String dcId) { @Override public DcMetaChangeManager create() { return new DefaultDcMetaChangeManager(dcId, instanceManager, healthCheckEndpointFactory, - checkerConsoleService, checkerConfig); + checkerConsoleService, checkerConfig, metaCache); } }); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java index 2cad344d3..df5e2c7f1 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java @@ -83,6 +83,8 @@ public void check() { Subscribe command = pubSubConnectionWrapper.command.get(); if (command instanceof CRDTSubscribeCommand) { crdtsubscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); + } else if (command instanceof PsubscribeCommand) { + psubscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); } else { subscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); } @@ -109,6 +111,10 @@ public synchronized void crdtsubscribeIfAbsent(SubscribeCallback callback, Strin subscribeIfAbsent(callback, () -> new CRDTSubscribeCommand(clientPool, scheduled, commandTimeOut, channel), channel); } + public synchronized void psubscribeIfAbsent(SubscribeCallback callback, String... channel) { + subscribeIfAbsent(callback, () -> new PsubscribeCommand(clientPool, scheduled, commandTimeOut, channel), channel); + } + private synchronized void subscribeIfAbsent(SubscribeCallback callback, Supplier subCommandSupplier, String... channel) { PubSubConnectionWrapper pubSubConnectionWrapper = subscribConns.get(Sets.newHashSet(channel)); if (pubSubConnectionWrapper == null || pubSubConnectionWrapper.shouldCreateNewSession()) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java index 667b35d47..c3f1f83d7 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java @@ -17,8 +17,12 @@ public class DefaultCheckerService extends AbstractService implements CheckerSer private static final String PATH_GET_HEALTH_STATE = "/api/health/{ip}/{port}"; + private static final String PATH_GET_CROSS_REGION_HEALTH_STATE = "/api/health/cross/region/{ip}/{port}"; + private static final String PATH_GET_ALL_INSTANCE_HEALTH_STATUS = "/api/health/check/status/all"; + private static final String PATH_GET_ALL_CROSS_REGION_INSTANCE_HEALTH_STATUS = "/api/health/check/cross/region/status/all"; + public DefaultCheckerService(String host) { if (host.startsWith("http://")) this.host = host; else this.host = "http://" + host; @@ -29,6 +33,16 @@ public HEALTH_STATE getInstanceStatus(String ip, int port) { return restTemplate.getForObject(host + PATH_GET_HEALTH_STATE, HEALTH_STATE.class, ip, port); } + @Override + public HEALTH_STATE getCrossRegionInstanceStatus(String ip, int port) { + return restTemplate.getForObject(host + PATH_GET_CROSS_REGION_HEALTH_STATE, HEALTH_STATE.class, ip, port); + } + + @Override + public Map getAllInstanceCrossRegionHealthStatus() { + return restTemplate.getForObject(host + PATH_GET_ALL_CROSS_REGION_INSTANCE_HEALTH_STATUS, AllInstanceHealthStatus.class); + } + @Override public Map getAllInstanceHealthStatus() { return restTemplate.getForObject(host + PATH_GET_ALL_INSTANCE_HEALTH_STATUS, AllInstanceHealthStatus.class); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java index 5c02c2a1d..8dce340ce 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java @@ -9,6 +9,9 @@ import com.ctrip.xpipe.redis.checker.RedisDelayManager; import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderAware; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultPsubPingActionCollector; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStateService; import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingService; import com.ctrip.xpipe.redis.checker.model.CheckerRole; @@ -21,8 +24,12 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; /** * @author lishanglin @@ -30,7 +37,7 @@ */ public class HealthCheckReporter implements GroupCheckerLeaderAware { - private HealthStateService healthStateService; + private List healthStateServices; private RedisDelayManager redisDelayManager; @@ -60,11 +67,11 @@ public class HealthCheckReporter implements GroupCheckerLeaderAware { private static final Logger logger = LoggerFactory.getLogger(HealthCheckReporter.class); - public HealthCheckReporter(HealthStateService healthStateService, CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, + public HealthCheckReporter(List healthStateServices, CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, ClusterServer clusterServer, ClusterServer allCheckerServer, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, ClusterHealthManager clusterHealthManager, int serverPort) { - this.healthStateService = healthStateService; + this.healthStateServices = healthStateServices; this.serverPort = serverPort; this.config = checkerConfig; this.checkerConsoleService = checkerConsoleService; @@ -142,7 +149,7 @@ private void reportCheckResult() { result.encodeCrossMasterDelays(crossMasterDelayManager.getAllCrossMasterDelays()); result.encodeRedisAlives(pingService.getAllRedisAlives()); result.setWarningClusterShards(clusterHealthManager.getAllClusterWarningShards()); - result.encodeRedisStates(healthStateService.getAllCachedState()); + result.encodeRedisStates(getAllRedisStates()); result.setHeteroShardsDelay(redisDelayManager.getAllHeteroShardsDelays()); checkerConsoleService.report(config.getConsoleAddress(), result); @@ -151,4 +158,28 @@ private void reportCheckResult() { } } + private Map getAllRedisStates() { + Map redisStates = new HashMap<>(); + DefaultDelayPingActionCollector delayPingActionCollector = null; + DefaultPsubPingActionCollector psubPingActionCollector = null; + for (HealthStateService service : healthStateServices) { + if (service instanceof DefaultDelayPingActionCollector) { + delayPingActionCollector = (DefaultDelayPingActionCollector) service; + } + if (service instanceof DefaultPsubPingActionCollector) { + psubPingActionCollector = (DefaultPsubPingActionCollector) service; + } + } + if (delayPingActionCollector != null) { + redisStates.putAll(delayPingActionCollector.getAllCachedState()); + } + if (psubPingActionCollector != null) { + Map allCachedState = psubPingActionCollector.getAllCachedState(); + for (Map.Entry entry : allCachedState.entrySet()) { + redisStates.put(entry.getKey(), entry.getValue()); + } + } + return redisStates; + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java index 533156a23..c88bd7bb9 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java @@ -198,10 +198,21 @@ public Map loadAllClusterCreateTime(String console) { } @Override - public Map loadAllActiveDcOneWayClusterInfo(String console, String activeDc) { + public Map loadAllDcOneWayClusterInfo(String console, String dc) { UriComponents comp = UriComponentsBuilder .fromHttpUrl(console + ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ACTIVE_DC_ONE_WAY_CLUSTERS) - .queryParam("activeDc", activeDc).build(); + .queryParam("dc", dc).build(); + + ResponseEntity> times = restTemplate.exchange( + comp.toString(), HttpMethod.GET, null, clusterInfoMapTypeDef); + return times.getBody(); + } + + @Override + public Map loadCurrentDcOneWayClusterInfo(String console, String dc) { + UriComponents comp = UriComponentsBuilder + .fromHttpUrl(console + ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ONE_WAY_CLUSTERS) + .queryParam("dc", dc).build(); ResponseEntity> times = restTemplate.exchange( comp.toString(), HttpMethod.GET, null, clusterInfoMapTypeDef); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java index 7a85a0601..a70b4d1c7 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java @@ -210,7 +210,12 @@ public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws } @Override - public Map getAllActiveDcClusters(String activeDc) { + public Map getAllDcClusters(String dc) { + return null; + } + + @Override + public Map getAllCurrentDcClusters(String dc) { return null; } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java index b5827edee..ff4729b79 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java @@ -6,7 +6,6 @@ import com.ctrip.xpipe.redis.checker.CheckerService; import com.ctrip.xpipe.redis.checker.OuterClientCache; import com.ctrip.xpipe.redis.checker.RemoteCheckerManager; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatus; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; @@ -59,7 +58,7 @@ public void setupInstanceHealthStatusCollectorTest() { public void testCollect() throws Exception { Map healthStatus = mockHealthStatusMap(HEALTH_STATE.HEALTHY); Mockito.when(remoteCheckerService.getAllInstanceHealthStatus()).thenReturn(healthStatus); - Mockito.when(outerClientCache.getAllActiveDcClusters("jq")).thenReturn(mockOutClientResp(true)); + Mockito.when(outerClientCache.getAllDcClusters("jq")).thenReturn(mockOutClientResp(true)); healthStatus = mockHealthStatusMap(HEALTH_STATE.HEALTHY); Mockito.when(localCheckerService.getAllInstanceHealthStatus()).thenReturn(healthStatus); Pair result = this.collector.collect(); @@ -113,7 +112,7 @@ public void testCollectXPipeInstanceHealth() throws Exception { HostPort mockInstance = new HostPort("10.0.0.1", 6379); Mockito.when(remoteCheckerService.getInstanceStatus(mockInstance.getHost(), mockInstance.getPort())).thenReturn(HEALTH_STATE.HEALTHY); Mockito.when(localCheckerService.getInstanceStatus(mockInstance.getHost(), mockInstance.getPort())).thenReturn(HEALTH_STATE.HEALTHY); - XPipeInstanceHealthHolder xpipeInstanceHealthHolder = this.collector.collectXPipeInstanceHealth(mockInstance); + XPipeInstanceHealthHolder xpipeInstanceHealthHolder = this.collector.collectXPipeInstanceHealth(mockInstance, false); Assert.assertEquals(Boolean.TRUE, xpipeInstanceHealthHolder.aggregate(mockInstance, 2)); } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java index c11e1acd9..8f79ea0eb 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java @@ -84,7 +84,6 @@ public void setupInstanceHealthStatusConsistenceCheckerTest() throws Exception { Mockito.when(config.getRedisReplicationHealthCheckInterval()).thenReturn(2000); Mockito.when(config.getDownAfterCheckNums()).thenReturn(5); Mockito.when(siteStability.isSiteStable()).thenReturn(true); - Mockito.when(delayPingActionCollector.getState(any())).thenReturn(HEALTH_STATE.DOWN); } @Override @@ -103,9 +102,9 @@ public void testCheckAndAdjust() throws Exception { Mockito.when(xpipeInstanceHealthHolder.aggregate(ArgumentMatchers.eq(master), anyInt())).thenReturn(true); inspector.inspect(); - Mockito.verify(adjuster).adjustInstances(ArgumentMatchers.eq(Collections.singleton(new HostPort("10.0.0.1", 6379))), + Mockito.verify(adjuster).adjustInstances(ArgumentMatchers.eq(Collections.singleton(new HostPort("10.0.0.1", 6379))), anyBoolean(), ArgumentMatchers.eq(true), anyLong()); - Mockito.verify(adjuster).adjustInstances(ArgumentMatchers.eq(Collections.singleton(new HostPort("10.0.0.2", 6379))), + Mockito.verify(adjuster).adjustInstances(ArgumentMatchers.eq(Collections.singleton(new HostPort("10.0.0.2", 6379))), anyBoolean(), ArgumentMatchers.eq(false), anyLong()); } @@ -118,7 +117,7 @@ public void testNotMarkDownMasterUnhealthyInstances() throws Exception { Mockito.when(xpipeInstanceHealthHolder.aggregate(ArgumentMatchers.eq(master), anyInt())).thenReturn(false); inspector.inspect(); - Mockito.verify(adjuster, Mockito.never()).adjustInstances(any(), anyBoolean(), anyLong()); + Mockito.verify(adjuster, Mockito.never()).adjustInstances(any(), anyBoolean(), anyBoolean(), anyLong()); } @Test diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommandTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommandTest.java index bea8774c2..6ee40cc90 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommandTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommandTest.java @@ -59,12 +59,12 @@ public void setupInstanceStatusAdjustCommandTest() throws Exception { when(config.getHealthMarkCompensateIntervalMill()).thenReturn(1000L); when(config.getQuorum()).thenReturn(2); when(siteStability.isSiteStable()).thenReturn(true); - when(collector.collectXPipeInstanceHealth(instance.getHostPort())).thenReturn(xpipeInstanceHealth); + when(collector.collectXPipeInstanceHealth(instance.getHostPort(), false)).thenReturn(xpipeInstanceHealth); } @Test public void testAdjust() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(outerClientService.isInstanceUp(instance)).thenReturn(false); when(xpipeInstanceHealth.aggregate(instance.getHostPort(),2)).thenReturn(Boolean.TRUE); @@ -75,21 +75,21 @@ public void testAdjust() throws Exception { @Test(expected = ExecutionException.class) public void testTimeoutAfterCollect() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(outerClientService.isInstanceUp(instance)).thenReturn(false); when(xpipeInstanceHealth.aggregate(instance.getHostPort(),2)).thenReturn(Boolean.TRUE); doAnswer(inv -> { sleep(timeoutMilli + 1); return xpipeInstanceHealth; - }).when(collector).collectXPipeInstanceHealth(instance.getHostPort()); + }).when(collector).collectXPipeInstanceHealth(instance.getHostPort(), false); cmd.execute().get(); } @Test public void testSkipForOuterClientChange() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(outerClientService.isInstanceUp(instance)).thenReturn(true); cmd.execute().get(); @@ -99,7 +99,7 @@ public void testSkipForOuterClientChange() throws Exception { @Test public void testSkipForXPipeChange() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(outerClientService.isInstanceUp(instance)).thenReturn(false); when(xpipeInstanceHealth.aggregate(instance.getHostPort(),2)).thenReturn(null); @@ -110,7 +110,7 @@ public void testSkipForXPipeChange() throws Exception { @Test public void testSiteUnstable() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(siteStability.isSiteStable()).thenReturn(false); cmd.execute().get(); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java index f32334f43..fe73ce780 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java @@ -74,7 +74,7 @@ public void beforeDefaultDcMetaChangeManagerTest() throws IOException, SAXExcept when(checkerConfig.getConsoleAddress()).thenReturn("127.0.0.1"); when(checkerConsoleService.getXpipeDcAllMeta(Mockito.anyString(), Mockito.anyString())).thenReturn(getXpipeMeta()); - manager = new DefaultDcMetaChangeManager("oy", instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager("oy", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); } private void prepareData(String dc) { @@ -263,7 +263,7 @@ public void visitRemoved() { @Test public void visitRemovedClusterActiveDc(){ - manager = spy(new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig)); + manager = spy(new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache)); manager.compare(getDcMeta("jq"), null); DcMeta dcMeta = MetaCloneFacade.INSTANCE.clone(getDcMeta("jq")); @@ -359,7 +359,7 @@ public void testSwitchClusterShards() throws Exception { @Test public void testHeteroClusterModified() throws Exception { - DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig); + DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); DcMeta dcMeta= getDcMeta("jq"); ClusterMeta cluster1 = dcMeta.findCluster("cluster2"); cluster1.setBackupDcs("ali"); @@ -384,7 +384,7 @@ public void testHeteroClusterModified() throws Exception { @Test public void testBackupDcClusterModified() throws Exception { - DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig); + DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); DcMeta dcMeta= getDcMeta("jq"); ClusterMeta cluster1 = dcMeta.findCluster("cluster2"); cluster1.setBackupDcs("jq,ali"); @@ -491,7 +491,7 @@ public void visitModified1() { @Test public void testKeeperChange() throws Exception { String dcId = FoundationService.DEFAULT.getDataCenter(); - manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); prepareData(dcId); DcMeta future = cloneDcMeta(dcId); future.addKeeperContainer(new KeeperContainerMeta().setId(4L).setIp("1.1.1.4").setPort(8080)); @@ -512,7 +512,7 @@ public void testKeeperChange() throws Exception { @Test public void testRedisChange() throws Exception { String dcId = FoundationService.DEFAULT.getDataCenter(); - manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); prepareData(dcId); DcMeta future = cloneDcMeta(dcId); future.addKeeperContainer(new KeeperContainerMeta().setId(4L).setIp("1.1.1.4").setPort(8080)); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerApiService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerApiService.java index e04b7fe6a..1a55033a6 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerApiService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerApiService.java @@ -7,10 +7,18 @@ public interface ConsoleCheckerApiService { String PATH_HEALTH_CHECK_INSTANCE = "/api/health/check/instance/{ip}/{port}"; + String PATH_CROSS_REGION_HEALTH_CHECK_INSTANCE = "/api/health/check/cross/region//instance/{ip}/{port}"; + String PATH_HEALTH_STATUS = "/api/health/{ip}/{port}"; + String PATH_CROSS_REGION_HEALTH_STATUS = "/api/health/cross/region/{ip}/{port}"; + String getHealthCheckInstance(HostPort checker, String ip, int port); + String getCrossRegionHealthCheckInstance(HostPort checker, String ip, int port); + HEALTH_STATE getHealthStates(HostPort checker, String ip, int port); + HEALTH_STATE getCrossRegionHealthStates(HostPort checker, String ip, int port); + } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerGroupService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerGroupService.java index b3e0f1098..0b32899fb 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerGroupService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerGroupService.java @@ -10,8 +10,8 @@ public interface ConsoleCheckerGroupService { HostPort getCheckerLeader(long clusterDbId); - CommandFuture> getAllHealthCheckInstance(long clusterDbId, String ip, int port); + CommandFuture> getAllHealthCheckInstance(long clusterDbId, String ip, int port, boolean isCrossRegion); - CommandFuture> getAllHealthStates(long clusterDbId, String ip, int port); + CommandFuture> getAllHealthStates(long clusterDbId, String ip, int port, boolean isCrossRegion); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetCommand.java index 368eeafdf..f2364b55b 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetCommand.java @@ -15,11 +15,14 @@ public class InstanceHealthCheckGetCommand extends AbstractCommand { int port; - public InstanceHealthCheckGetCommand(ConsoleCheckerApiService service, HostPort checker, String ip, int port) { + boolean isCrossRegion; + + public InstanceHealthCheckGetCommand(ConsoleCheckerApiService service, HostPort checker, String ip, int port, boolean isCrossRegion) { this.service = service; this.checker = checker; this.ip = ip; this.port = port; + this.isCrossRegion = isCrossRegion; } @Override @@ -29,7 +32,11 @@ public String getName() { @Override protected void doExecute() throws Throwable { - future().setSuccess(service.getHealthCheckInstance(checker, ip, port)); + if (isCrossRegion) { + future().setSuccess(service.getCrossRegionHealthCheckInstance(checker, ip, port)); + } else { + future().setSuccess(service.getHealthCheckInstance(checker, ip, port)); + } } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetGroupCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetGroupCommand.java index 017d569f8..dec739ecf 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetGroupCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetGroupCommand.java @@ -24,13 +24,16 @@ public class InstanceHealthCheckGetGroupCommand extends AbstractCommand checkers, String ip, int port, ExecutorService executor) { + public InstanceHealthCheckGetGroupCommand(ConsoleCheckerApiService service, List checkers, String ip, int port, boolean isCrossRegion, ExecutorService executor) { this.service = service; this.checkers = checkers; this.ip = ip; this.port = port; + this.isCrossRegion = isCrossRegion; this.executor = executor; } @@ -43,7 +46,7 @@ public String getName() { protected void doExecute() { Map> futureMap = new HashMap<>(); checkers.forEach(checker -> { - CommandFuture future = new InstanceHealthCheckGetCommand(service, checker, ip, port).execute(executor); + CommandFuture future = new InstanceHealthCheckGetCommand(service, checker, ip, port, isCrossRegion).execute(executor); futureMap.put(checker, future); }); Map result = new HashMap<>(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthStatusGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthStatusGetCommand.java index b78e98836..308025b2d 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthStatusGetCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthStatusGetCommand.java @@ -15,11 +15,14 @@ public class InstanceHealthStatusGetCommand extends AbstractCommand checkers, String ip, int port, ExecutorService executor) { + public InstanceHealthStatusGetGroupCommand(ConsoleCheckerApiService service, List checkers, String ip, int port, boolean isCrossRegion, ExecutorService executor) { this.service = service; this.checkers = checkers; this.ip = ip; this.port = port; + this.isCrossRegion = isCrossRegion; this.executor = executor; } @@ -46,7 +49,7 @@ public String getName() { protected void doExecute() throws Throwable { Map> futureMap = new HashMap<>(); checkers.forEach(checker -> { - CommandFuture future = new InstanceHealthStatusGetCommand(service, checker, ip, port).execute(executor); + CommandFuture future = new InstanceHealthStatusGetCommand(service, checker, ip, port, isCrossRegion).execute(executor); futureMap.put(checker, future); }); Map result = new HashMap<>(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerApiService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerApiService.java index f0199a7c2..6d1010272 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerApiService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerApiService.java @@ -14,11 +14,21 @@ public String getHealthCheckInstance(HostPort checker, String ip, int port) { return restTemplate.getForObject(getPath(checker, PATH_HEALTH_CHECK_INSTANCE), String.class, ip, port); } + @Override + public String getCrossRegionHealthCheckInstance(HostPort checker, String ip, int port) { + return restTemplate.getForObject(getPath(checker, PATH_CROSS_REGION_HEALTH_CHECK_INSTANCE), String.class, ip, port); + } + @Override public HEALTH_STATE getHealthStates(HostPort checker, String ip, int port) { return restTemplate.getForObject(getPath(checker, PATH_HEALTH_STATUS), HEALTH_STATE.class, ip, port); } + @Override + public HEALTH_STATE getCrossRegionHealthStates(HostPort checker, String ip, int port) { + return restTemplate.getForObject(getPath(checker, PATH_CROSS_REGION_HEALTH_STATUS), HEALTH_STATE.class, ip, port); + } + private String getPath(HostPort key, String path) { return "http://" + key.getHost() + ":" + key.getPort() + path; } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerGroupService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerGroupService.java index 6932f99b6..9c1f40fc8 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerGroupService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerGroupService.java @@ -40,13 +40,13 @@ public List getAllChecker(long clusterDbId) { } @Override - public CommandFuture> getAllHealthCheckInstance(long clusterDbId, String ip, int port) { - return new InstanceHealthCheckGetGroupCommand(checkerApiService, getAllChecker(clusterDbId), ip, port, executor).execute(executor); + public CommandFuture> getAllHealthCheckInstance(long clusterDbId, String ip, int port, boolean isCrossRegion) { + return new InstanceHealthCheckGetGroupCommand(checkerApiService, getAllChecker(clusterDbId), ip, port, isCrossRegion, executor).execute(executor); } @Override - public CommandFuture> getAllHealthStates(long clusterDbId, String ip, int port) { - return new InstanceHealthStatusGetGroupCommand(checkerApiService, getAllChecker(clusterDbId), ip, port, executor).execute(executor); + public CommandFuture> getAllHealthStates(long clusterDbId, String ip, int port, boolean isCrossRegion) { + return new InstanceHealthStatusGetGroupCommand(checkerApiService, getAllChecker(clusterDbId), ip, port, isCrossRegion, executor).execute(executor); } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java index 9701ea130..2261d8ca1 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java @@ -54,15 +54,20 @@ public class DefaultConsoleDcCheckerService implements ConsoleDcCheckerService { @Override public List getShardAllCheckerGroupHealthCheck(String dcId, String clusterId, String shardId) { + List result = new ArrayList<>(); ClusterTbl clusterTbl = clusterService.find(clusterId); if (clusterTbl == null) { return null; } String activeDc = dcService.getDcName(clusterTbl.getActivedcId()); + if (metaCache.isCrossRegion(dcId, activeDc)) { + result.addAll(consoleManager.getShardAllCheckerGroupHealthCheck(dcId, dcId, clusterId, shardId)); + } if (activeDc.equalsIgnoreCase(currentDc)) { return getLocalDcShardAllCheckerGroupHealthCheck(dcId, clusterId, shardId); } - return consoleManager.getShardAllCheckerGroupHealthCheck(activeDc, dcId, clusterId, shardId); + result.addAll(consoleManager.getShardAllCheckerGroupHealthCheck(activeDc, dcId, clusterId, shardId)); + return result; } @Override @@ -75,8 +80,9 @@ public List getLocalDcShardAllCheckerGroupHealthCh Map>> redisCheckerActionMap = new HashMap<>(); Map>> redisCheckerHealthStateMap = new HashMap<>(); redisOfDcClusterShard.forEach(redisMeta -> { - CommandFuture> allHealthCheckInstance = consoleCheckerGroupService.getAllHealthCheckInstance(clusterTbl.getId(), redisMeta.getIp(), redisMeta.getPort()); - CommandFuture> allHealthStates = consoleCheckerGroupService.getAllHealthStates(clusterTbl.getId(), redisMeta.getIp(), redisMeta.getPort()); + boolean isCrossRegion = metaCache.isCrossRegion(currentDc, dcService.getDcName(clusterTbl.getActivedcId())); + CommandFuture> allHealthCheckInstance = consoleCheckerGroupService.getAllHealthCheckInstance(clusterTbl.getId(), redisMeta.getIp(), redisMeta.getPort(), isCrossRegion); + CommandFuture> allHealthStates = consoleCheckerGroupService.getAllHealthStates(clusterTbl.getId(), redisMeta.getIp(), redisMeta.getPort(), isCrossRegion); redisCheckerActionMap.put(new HostPort(redisMeta.getIp(), redisMeta.getPort()), allHealthCheckInstance); redisCheckerHealthStateMap.put(new HostPort(redisMeta.getIp(), redisMeta.getPort()), allHealthStates); }); @@ -88,7 +94,7 @@ public List getLocalDcShardAllCheckerGroupHealthCh List result = new ArrayList<>(); for (Map.Entry> entry : checkerInstancesMap.entrySet()) { HostPort checker = entry.getKey(); - ShardCheckerHealthCheckModel shardCheckerHealthCheckModel = new ShardCheckerHealthCheckModel(checker.getHost(), checker.getPort()); + ShardCheckerHealthCheckModel shardCheckerHealthCheckModel = new ShardCheckerHealthCheckModel(checker.getHost(), checker.getPort(), currentDc); if (checker.equals(checkerLeader)) { shardCheckerHealthCheckModel.setCheckerRole(CheckerRole.LEADER); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java index dcab438be..943567a24 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java @@ -30,8 +30,12 @@ public class DefaultConsoleService extends AbstractService implements ConsoleSer private final String healthStatusUrl; + private final String crossRegionHealthStatusUrl; + private final String allHealthStatusUrl; + private final String allCrossRegionHealthStatusUrl; + private final String pingStatusUrl; private final String innerShardDelayStatusUrl; @@ -69,7 +73,9 @@ public DefaultConsoleService(String address){ this.address = "http://" + this.address; } healthStatusUrl = String.format("%s/api/health/{ip}/{port}", this.address); + crossRegionHealthStatusUrl = String.format("%s/api/cross/region/health/{ip}/{port}", this.address); allHealthStatusUrl = String.format("%s/api/health/check/status/all", this.address); + allCrossRegionHealthStatusUrl = String.format("%s/api/health/cross/region/check/status/all", this.address); pingStatusUrl = String.format("%s/api/redis/ping/{ip}/{port}", this.address); innerShardDelayStatusUrl = String.format("%s/api/shard/inner/delay/{shardId}", this.address); innerDelayStatusUrl = String.format("%s/api/redis/inner/delay/{ip}/{port}", this.address); @@ -89,11 +95,21 @@ public HEALTH_STATE getInstanceStatus(String ip, int port) { return restTemplate.getForObject(healthStatusUrl, HEALTH_STATE.class, ip, port); } + @Override + public HEALTH_STATE getCrossRegionInstanceStatus(String ip, int port) { + return restTemplate.getForObject(crossRegionHealthStatusUrl, HEALTH_STATE.class, ip, port); + } + @Override public Map getAllInstanceHealthStatus() { return restTemplate.getForObject(allHealthStatusUrl, AllInstanceHealthStatus.class); } + @Override + public Map getAllInstanceCrossRegionHealthStatus() { + return restTemplate.getForObject(allCrossRegionHealthStatusUrl, AllInstanceHealthStatus.class); + } + @Override public List getShardAllCheckerGroupHealthCheck(String dcId, String clusterId, String shardId) { return restTemplate.getForObject(shardAllCheckerGroupHealthCheckUrl, ShardCheckerHealthCheckModels.class, dcId, clusterId, shardId); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java index b469b4ff4..8157720cc 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java @@ -207,8 +207,13 @@ public void recordAlert(@RequestBody CheckerConsoleService.AlertMessage alertMes } @GetMapping(value = ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ACTIVE_DC_ONE_WAY_CLUSTERS) - public Map loadAllOuterClientClusters(@RequestParam String activeDc) { - return outerClientCache.getAllActiveDcClusters(activeDc); + public Map loadAllOuterClientClusters(@RequestParam String dc) { + return outerClientCache.getAllDcClusters(dc); + } + + @GetMapping(value = ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ONE_WAY_CLUSTERS) + public Map loadCurrentDcOuterClientClusters(@RequestParam String dc) { + return outerClientCache.getAllCurrentDcClusters(dc); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java index 5f0cee57a..356e30dc0 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java @@ -7,6 +7,7 @@ public class ShardCheckerHealthCheckModel { + private String idc; private String host; private int port; private CheckerRole checkerRole; @@ -15,13 +16,22 @@ public class ShardCheckerHealthCheckModel { public ShardCheckerHealthCheckModel() { } - public ShardCheckerHealthCheckModel(String host, int port) { + public ShardCheckerHealthCheckModel(String host, int port, String idc) { this.host = host; this.port = port; + this.idc = idc; this.checkerRole = CheckerRole.FOLLOWER; this.instances = new ArrayList<>(); } + public String getIdc() { + return idc; + } + + public void setIdc(String idc) { + this.idc = idc; + } + public String getHost() { return host; } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java index 01fbaba70..c6123d8eb 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java @@ -34,9 +34,22 @@ public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws } @Override - public Map getAllActiveDcClusters(String activeDc) { + public Map getAllDcClusters(String dc) { try { - return service.loadAllActiveDcOneWayClusterInfo(config.getConsoleAddress(), activeDc); + return service.loadAllDcOneWayClusterInfo(config.getConsoleAddress(), dc); + } catch (RestClientException e) { + logger.warn("[getAllOneWayClusters] rest fail, {}", e.getMessage()); + } catch (Throwable th) { + logger.warn("[getAllOneWayClusters] fail", th); + } + + return Collections.emptyMap(); + } + + @Override + public Map getAllCurrentDcClusters(String dc) { + try { + return service.loadCurrentDcOneWayClusterInfo(config.getConsoleAddress(), dc); } catch (RestClientException e) { logger.warn("[getAllOneWayClusters] rest fail, {}", e.getMessage()); } catch (Throwable th) { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java index 43f0eab79..ab9675bc6 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java @@ -12,11 +12,13 @@ import org.springframework.stereotype.Component; import org.springframework.web.client.RestClientException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import com.ctrip.xpipe.api.migration.OuterClientService.*; /** * @author lishanglin @@ -30,22 +32,28 @@ public class DefaultOuterClientCache extends AbstractLifecycle implements OuterC private ConsoleConfig config; - private TimeBoundCache> clustersCache; + private TimeBoundCache> clustersCache; + + private TimeBoundCache> currentDcClustersCache; private ScheduledExecutorService scheduled; private DynamicDelayPeriodTask refreshTask; + private DynamicDelayPeriodTask refreshCurrentDcTask; + public DefaultOuterClientCache(ConsoleConfig config) { this.outerClientService = OuterClientService.DEFAULT; this.config = config; this.clustersCache = new TimeBoundCache<>(() -> 10000 + config.getRedisConfCheckIntervalMilli(), - () -> loadActiveDcClusters(FoundationService.DEFAULT.getDataCenter())); + () -> loadClusters(FoundationService.DEFAULT.getDataCenter())); + this.currentDcClustersCache = new TimeBoundCache<>(() -> 10000 + config.getRedisConfCheckIntervalMilli(), + () -> loadCurrentDcClusters(FoundationService.DEFAULT.getDataCenter())); } @Override - public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws Exception { - OuterClientService.ClusterInfo clusterInfo = clustersCache.getData(false).get(clusterName.toLowerCase()); + public ClusterInfo getClusterInfo(String clusterName) throws Exception { + ClusterInfo clusterInfo = clustersCache.getData(false).get(clusterName.toLowerCase()); if (null == clusterInfo) { return outerClientService.getClusterInfo(clusterName); } @@ -54,16 +62,22 @@ public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws } @Override - public Map getAllActiveDcClusters(String activeDc) { - if (FoundationService.DEFAULT.getDataCenter().equalsIgnoreCase(activeDc)) return clustersCache.getData(false); - else return loadActiveDcClusters(activeDc); + public Map getAllDcClusters(String dc) { + if (FoundationService.DEFAULT.getDataCenter().equalsIgnoreCase(dc)) return clustersCache.getData(false); + else return loadClusters(dc); } - private Map loadActiveDcClusters(String activeDc) { - Map clusters = new HashMap<>(); + @Override + public Map getAllCurrentDcClusters(String dc) { + if (FoundationService.DEFAULT.getDataCenter().equalsIgnoreCase(dc)) return currentDcClustersCache.getData(false); + else return loadCurrentDcClusters(dc); + } + + private Map loadClusters(String dc) { + Map clusters = new HashMap<>(); try { - List clusterInfos = outerClientService.getActiveDcClusters(activeDc); - for (OuterClientService.ClusterInfo cluster: clusterInfos) { + List clusterInfos = outerClientService.getActiveDcClusters(dc); + for (ClusterInfo cluster: clusterInfos) { clusters.put(cluster.getName().toLowerCase(), cluster); } } catch (RestClientException e) { @@ -75,6 +89,69 @@ private Map loadActiveDcClusters(String return clusters; } + private Map loadCurrentDcClusters(String dc) { + Map clusters = new HashMap<>(); + try { + DcMeta currentDcClusterInfos = outerClientService.getOutClientDcMeta(dc); + for (ClusterMeta clusterMeta: currentDcClusterInfos.getClusters().values()) { + if (!ClusterType.XPIPE_ONE_WAY.equals(clusterMeta.getClusterType())) continue; + ClusterInfo clusterInfo = buildClusterModel(clusterMeta); + if (!clusters.containsKey(clusterInfo.getName().toLowerCase())) { + clusters.put(clusterInfo.getName().toLowerCase(), clusterInfo); + } + } + } catch (RestClientException e) { + logger.warn("[refresh] rest fail, {}", e.getMessage()); + } catch (Throwable th) { + logger.warn("[refresh] fail", th); + } + + return clusters; + } + + public ClusterInfo buildClusterModel(ClusterMeta clusterMeta) { + ClusterInfo cluster = clusterMetaToClusterModel(clusterMeta); + List groupModels = new ArrayList<>(); + for (GroupMeta group : clusterMeta.getGroups().values()) { + List instanceModels = new ArrayList<>(); + GroupInfo groupModel = groupMetaToGroupModel(group); + for (RedisMeta instance : group.getRedises()) { + instanceModels.add(redisMetaToInstanceModel(instance)); + } + groupModel.setInstances(instanceModels); + groupModels.add(groupModel); + } + cluster.setGroups(groupModels); + return cluster; + } + + public ClusterInfo clusterMetaToClusterModel(ClusterMeta clusterMeta) { + ClusterInfo cluster = new ClusterInfo(); + cluster.setName(clusterMeta.getName()); + if (clusterMeta.getClusterType().equals(ClusterType.XPIPE_ONE_WAY)) { + cluster.setIsXpipe(true); + cluster.setMasterIDC(clusterMeta.getActiveIDC()); + } + return cluster; + } + + public GroupInfo groupMetaToGroupModel(GroupMeta groupMeta) { + GroupInfo groupModel = new GroupInfo(); + groupModel.setName(groupMeta.getGroupName()); + return groupModel; + } + + public InstanceInfo redisMetaToInstanceModel(RedisMeta redisMeta) { + InstanceInfo instance = new InstanceInfo(); + instance.setIPAddress(redisMeta.getHost()); + instance.setPort(redisMeta.getPort()); + instance.setIsMaster(redisMeta.isMaster()); + instance.setStatus(InstanceStatus.ACTIVE.equals(redisMeta.getStatus())); + instance.setEnv(redisMeta.getIdc()); + instance.setCanRead(!InstanceStatus.INACTIVE.equals(redisMeta.getStatus())); + return instance; + } + @Override protected void doInitialize() throws Exception { super.doInitialize(); @@ -82,18 +159,22 @@ protected void doInitialize() throws Exception { XpipeThreadFactory.create("OuterClientCacheRefreshScheduled")); this.refreshTask = new DynamicDelayPeriodTask("OuterClientCacheRefresh", clustersCache::refresh, config::getRedisConfCheckIntervalMilli, scheduled); + this.refreshCurrentDcTask = new DynamicDelayPeriodTask("OuterClientCurrentDcCacheRefresh", currentDcClustersCache::refresh, + config::getRedisConfCheckIntervalMilli, scheduled); } @Override protected void doStart() throws Exception { super.doStart(); this.refreshTask.start(); + this.refreshCurrentDcTask.start(); } @Override protected void doStop() throws Exception { super.doStop(); this.refreshTask.stop(); + this.refreshCurrentDcTask.stop(); } @Override @@ -102,6 +183,7 @@ protected void doDispose() throws Exception { this.scheduled.shutdown(); this.scheduled = null; this.refreshTask = null; + this.refreshCurrentDcTask = null; } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java index 595743375..cf3b112fa 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java @@ -42,6 +42,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; +import java.util.List; import java.util.concurrent.ExecutorService; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; @@ -189,9 +190,9 @@ public SentinelManager sentinelManager() { public HealthCheckReporter healthCheckReporter(CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, GroupCheckerLeaderElector clusterServer, AllCheckerLeaderElector allCheckerLeaderElector, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, - ClusterHealthManager clusterHealthManager, HealthStateService healthStateService, + ClusterHealthManager clusterHealthManager, List healthStateServices, @Value("${server.port}") int serverPort) { - return new HealthCheckReporter(healthStateService, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, + return new HealthCheckReporter(healthStateServices, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, crossMasterDelayManager, pingService, clusterHealthManager, serverPort); } diff --git a/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html b/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html index 960baf970..95ab51581 100644 --- a/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html +++ b/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html @@ -134,7 +134,7 @@ Checker Groups - Show Actions + Show Actions(err) 刷新 @@ -144,7 +144,8 @@

- {{checker.host}}:{{checker.port}} + {{checker.idc}} + {{checker.host}}:{{checker.port}} {{checker.checkerRole}}
diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/checker/DefaultConsoleDcCheckerServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/checker/DefaultConsoleDcCheckerServiceTest.java index e18db8c64..ebed647e2 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/checker/DefaultConsoleDcCheckerServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/checker/DefaultConsoleDcCheckerServiceTest.java @@ -74,8 +74,8 @@ public void testGetLocalDcShardAllCheckerGroupHealthCheck() throws Exception { Mockito.when(metaCache.getRedisOfDcClusterShard(DC, CLUSTER, SHARD)).thenReturn(redisMetas); CommandFuture future1 = Mockito.mock(CommandFuture.class); CommandFuture future2 = Mockito.mock(CommandFuture.class); - Mockito.when(consoleCheckerGroupService.getAllHealthCheckInstance(clusterDbId, IP, PORT)).thenReturn(future1); - Mockito.when(consoleCheckerGroupService.getAllHealthStates(clusterDbId, IP, PORT)).thenReturn(future2); + Mockito.when(consoleCheckerGroupService.getAllHealthCheckInstance(clusterDbId, IP, PORT, false)).thenReturn(future1); + Mockito.when(consoleCheckerGroupService.getAllHealthStates(clusterDbId, IP, PORT, false)).thenReturn(future2); HostPort checker = new HostPort("127.0.0.2", 8080); Map checkerActionMap = new HashMap<>(); Map checkerHealthStateMap = new HashMap<>();; diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/console/ConsoleCheckerPath.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/console/ConsoleCheckerPath.java index db6e71d58..76cc57a8e 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/console/ConsoleCheckerPath.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/console/ConsoleCheckerPath.java @@ -50,6 +50,8 @@ private ConsoleCheckerPath() {} public static final String PATH_GET_ALL_CURRENT_DC_ACTIVE_DC_ONE_WAY_CLUSTERS = "/api/outclient/clusters/one_way"; + public static final String PATH_GET_ALL_CURRENT_DC_ONE_WAY_CLUSTERS = "/api/outclient/current/dc/clusters/one_way"; + public static final String PATH_PUT_CHECKER_LEADER_MERGE_ALERT = "/api/mail/{alertType}"; } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java index 8d427e185..823b433ee 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java @@ -204,8 +204,12 @@ protected void handleMessage(Object response) { if(!(response instanceof Object[])) { throw new RedisRuntimeException(String.format("Subscribe subscribeChannel response incorrect: %s", response)); } - - SubscribeMessageHandler handler = getSubscribeMessageHandler(); + SubscribeMessageHandler handler; + if (this.getName().equals(PSUBSCRIBE)) { + handler = getPSubscribeMessageHandler(); + } else { + handler = getSubscribeMessageHandler(); + } Pair channelAndMessage = handler.handle(payloadToStringArray(response)); if(channelAndMessage != null) { @@ -217,6 +221,10 @@ protected SubscribeMessageHandler getSubscribeMessageHandler() { return new DefaultSubscribeMessageHandler(); } + protected SubscribeMessageHandler getPSubscribeMessageHandler() { + return new PsubscribeMessageHandler(); + } + private void notifyListeners(Pair channelAndMessage) { for(SubscribeListener listener : listeners) { try { diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java new file mode 100644 index 000000000..f47a7e705 --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java @@ -0,0 +1,33 @@ +package com.ctrip.xpipe.redis.core.protocal.cmd.pubsub; + +import com.ctrip.xpipe.api.pool.SimpleObjectPool; +import com.ctrip.xpipe.netty.commands.NettyClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + +public class PsubscribeCommand extends AbstractSubscribe{ + + private static final Logger logger = LoggerFactory.getLogger(PsubscribeCommand.class); + + protected PsubscribeCommand(String host, int port, ScheduledExecutorService scheduled, MESSAGE_TYPE messageType, String... subscribeChannel) { + super(host, port, scheduled, messageType, subscribeChannel); + } + + public PsubscribeCommand(SimpleObjectPool clientPool, ScheduledExecutorService scheduled, int commandTimeoutMilli, String... channel) { + super(clientPool, scheduled, commandTimeoutMilli, MESSAGE_TYPE.PMESSAGE, channel); + } + + + @Override + public String getName() { + return PSUBSCRIBE; + } + + @Override + protected Logger getLogger() { + return logger; + } + +} diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java index e2109c834..b4fcda175 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java @@ -38,7 +38,7 @@ public SubscribeCommand(SimpleObjectPool clientPool, ScheduledExecu @Override public String getName() { - return "subscribe"; + return SUBSCRIBE; } @Override diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java index 6ef5ff7f6..aa984d3f4 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java @@ -44,6 +44,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; +import java.util.List; import java.util.concurrent.ExecutorService; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; @@ -184,9 +185,9 @@ public SentinelManager sentinelManager() { public HealthCheckReporter healthCheckReporter(CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, GroupCheckerLeaderElector clusterServer, AllCheckerLeaderElector allCheckerLeaderElector, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, - ClusterHealthManager clusterHealthManager, HealthStateService healthStateService, + ClusterHealthManager clusterHealthManager, List healthStateServices, @Value("${server.port}") int serverPort) { - return new HealthCheckReporter(healthStateService, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, + return new HealthCheckReporter(healthStateServices, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, crossMasterDelayManager, pingService, clusterHealthManager, serverPort); }