diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java index 467824242..6edcdbc8a 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java @@ -57,7 +57,7 @@ public interface CheckerConsoleService { Map loadAllClusterCreateTime(String console); - Map loadAllActiveDcOneWayClusterInfo(String console, String activeDc); + Map loadAllDcOneWayClusterInfo(String console, String dc); void bindShardSentinel(String console, String dc, String cluster, String shard, SentinelMeta sentinelMeta); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java index 5268f40fd..6237dd283 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java @@ -17,6 +17,6 @@ class AllInstanceHealthStatus extends HashMap {} HEALTH_STATE getInstanceStatus(String ip, int port); - Map getAllInstanceHealthStatus(); + Map getAllInstanceHealthStatus(boolean isCrossRegion); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java index b55baa24a..61d2782ca 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java @@ -12,6 +12,6 @@ public interface OuterClientCache { OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws Exception; - Map getAllActiveDcClusters(String activeDc); + Map getAllDcClusters(String dc); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java index 2e4593862..e81cec4d2 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.redis.checker.controller.result.ActionContextRetMessage; import com.ctrip.xpipe.redis.checker.healthcheck.*; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultPsubPingActionCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector; @@ -36,6 +37,9 @@ public class CheckerHealthController { @Autowired private DefaultDelayPingActionCollector defaultDelayPingActionCollector; + @Autowired + private DefaultPsubPingActionCollector defaultPsubPingActionCollector; + @Autowired private RedisUsedMemoryCollector redisUsedMemoryCollector; @@ -53,13 +57,21 @@ public class CheckerHealthController { @RequestMapping(value = "/health/{ip}/{port}", method = RequestMethod.GET) public HEALTH_STATE getHealthState(@PathVariable String ip, @PathVariable int port) { - if (siteStability.isSiteStable()) return defaultDelayPingActionCollector.getState(new HostPort(ip, port)); - else return HEALTH_STATE.UNKNOWN; + if (siteStability.isSiteStable()) { + HEALTH_STATE result = defaultDelayPingActionCollector.getState(new HostPort(ip, port)); + if (result == HEALTH_STATE.UNKNOWN) { + result = defaultPsubPingActionCollector.getHealthState(new HostPort(ip, port)); + } + return result; + } else return HEALTH_STATE.UNKNOWN; } @RequestMapping(value = "/health/check/instance/{ip}/{port}", method = RequestMethod.GET) public String getHealthCheckInstance(@PathVariable String ip, @PathVariable int port) { RedisHealthCheckInstance instance = instanceManager.findRedisHealthCheckInstance(new HostPort(ip, port)); + if (instance == null) { + instance = instanceManager.findRedisInstanceForPsubPingAction(new HostPort(ip, port)); + } if(instance == null) { return "Not found"; } @@ -97,6 +109,16 @@ public String getHealthCheckRedisInstanceForAssignedAction(@PathVariable String return Codec.DEFAULT.encode(model); } + @RequestMapping(value = "/health/check/redis-for-ping-action/{ip}/{port}", method = RequestMethod.GET) + public String getHealthCheckRedisInstanceForPingAction(@PathVariable String ip, @PathVariable int port) { + RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForPsubPingAction(new HostPort(ip, port)); + if(instance == null) { + return "Not found"; + } + HealthCheckInstanceModel model = buildHealthCheckInfo(instance); + return Codec.DEFAULT.encode(model); + } + @RequestMapping(value = "/health/redis/info/{ip}/{port}", method = RequestMethod.GET) public ActionContextRetMessage> getRedisInfo(@PathVariable String ip, @PathVariable int port) { return ActionContextRetMessage.from(redisInfoManager.getInfoByHostPort(new HostPort(ip, port))); @@ -113,6 +135,12 @@ public Map getAllHealthStatusDesc() { else return Collections.emptyMap(); } + @GetMapping("/health/check/cross/region/status/all") + public Map getAllCrossRegionHealthStatusDesc() { + if (siteStability.isSiteStable()) return defaultPsubPingActionCollector.getAllHealthStatus(); + else return Collections.emptyMap(); + } + @GetMapping("/health/keeper/status/all") public ConcurrentMap> getAllKeeperFlows() { return keeperFlowCollector.getHostPort2InputFlow(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java index 30f1e0aa8..e107c75c1 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java @@ -18,6 +18,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisMeta redis); + RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis); + KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper); ClusterHealthCheckInstance getOrCreate(ClusterMeta cluster); @@ -26,6 +28,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort); + RedisHealthCheckInstance findRedisInstanceForPsubPingAction(HostPort hostPort); + KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort); ClusterHealthCheckInstance findClusterHealthCheckInstance(String clusterId); @@ -36,6 +40,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort); + RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort); + ClusterHealthCheckInstance remove(String cluster); List getAllRedisInstance(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java new file mode 100644 index 000000000..33ba4ea6f --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java @@ -0,0 +1,94 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector; +import com.google.common.collect.Maps; + +import java.util.Map; + +public abstract class AbstractPsubPingActionCollector implements PsubPingActionCollector { + + protected Map allHealthStatus = Maps.newConcurrentMap(); + + protected PingActionListener pingActionListener = new AbstractPsubPingActionCollector.CollectorPingActionListener(); + + protected PsubActionListener psubActionListener = new AbstractPsubPingActionCollector.CollectorPsubActionListener(); + + protected abstract HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance); + + protected void removeHealthStatus(HealthCheckAction action) { + HealthStatus healthStatus = allHealthStatus.remove(action.getActionInstance()); + if(healthStatus != null) { + healthStatus.stop(); + } + } + + @Override + public boolean supportInstance(RedisHealthCheckInstance instance) { + return true; + } + + @Override + public PingActionListener createPingActionListener() { + return pingActionListener; + } + + @Override + public PsubActionListener createPsubActionListener() { + return psubActionListener; + } + + protected class CollectorPingActionListener implements PingActionListener { + + @Override + public void onAction(PingActionContext pingActionContext) { + HealthStatus healthStatus = createOrGetHealthStatus(pingActionContext.instance()); + if (!pingActionContext.isSuccess()) { + if (pingActionContext.getCause().getMessage().contains("LOADING")) { + healthStatus.loading(); + } + return; + } + + if (pingActionContext.getResult()) { + healthStatus.pong(); + } else { + if(healthStatus.getState() == HEALTH_STATE.UNKNOWN) { + healthStatus.pongInit(); + } + } + } + + @Override + public boolean worksfor(ActionContext t) { + return t instanceof PingActionContext; + } + + @Override + public void stopWatch(HealthCheckAction action) { + removeHealthStatus(action); + } + } + + protected class CollectorPsubActionListener implements PsubActionListener { + + @Override + public void onAction(PsubActionContext psubActionContext) { + HealthStatus healthStatus = createOrGetHealthStatus(psubActionContext.instance()); + if (!psubActionContext.getResult().isEmpty()) { + healthStatus.subSuccess(); + } + } + + @Override + public void stopWatch(HealthCheckAction action) { + removeHealthStatus(action); + } + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java new file mode 100644 index 000000000..ece7dd5ff --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java @@ -0,0 +1,100 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceDown; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceLoading; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceUp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * UNKNOWN + * pingSuccess -> INSTANCEUP + start subAction + * pingFail -> DOWN + markDown + * subSuccess -> throw exception + *

+ * INSTANCEUP + * pingSuccess,do nothing + * pingFail -> DOWN + markDown + * subSuccess -> HEALTHY + markUp + stop subAction + *

+ * HEALTHY + * pingSuccess,do nothing + * pingFail -> DOWN + markDown + * subSuccess -> throw exception + *

+ * DOWN + * pingSuccess -> INSTANCEUP + start subAction + * pingFail,do nothing + * subSuccess -> throw exception + */ +public class CrossRegionRedisHealthStatus extends HealthStatus { + + protected static final Logger logger = LoggerFactory.getLogger(CrossRegionRedisHealthStatus.class); + + public CrossRegionRedisHealthStatus(RedisHealthCheckInstance instance, ScheduledExecutorService scheduled) { + super(instance, scheduled); + } + + @Override + protected void loading() { + HEALTH_STATE preState = state.get(); + if(state.compareAndSet(preState, HEALTH_STATE.DOWN)) { + logStateChange(preState, state.get()); + } + if (!preState.equals(HEALTH_STATE.DOWN)) { + logger.info("[setLoading] {}", this); + notifyObservers(new InstanceLoading(instance)); + } + } + + @Override + protected void pong() { + lastPongTime.set(System.currentTimeMillis()); + HEALTH_STATE preState = state.get(); + if (preState.equals(HEALTH_STATE.UNKNOWN) || preState.equals(HEALTH_STATE.DOWN)) { + if(state.compareAndSet(preState, HEALTH_STATE.INSTANCEUP)) { + logStateChange(preState, state.get()); + } + } + } + + @Override + protected void subSuccess() { + HEALTH_STATE preState = state.get(); + if (preState.equals(HEALTH_STATE.INSTANCEUP)) { + if(state.compareAndSet(preState, HEALTH_STATE.HEALTHY)) { + logStateChange(preState, state.get()); + } + logger.info("[setUp] {}", this); + notifyObservers(new InstanceUp(instance)); + } + } + + @Override + protected void healthStatusUpdate() { + long currentTime = System.currentTimeMillis(); + + if(lastPongTime.get() != UNSET_TIME) { + long pingDownTime = currentTime - lastPongTime.get(); + final int pingDownAfter = pingDownAfterMilli.getAsInt(); + if (pingDownTime > pingDownAfter) { + doMarkDown(); + } + } + } + + protected void doMarkDown() { + HEALTH_STATE preState = state.get(); + if(state.compareAndSet(preState, HEALTH_STATE.DOWN)) { + logStateChange(preState, state.get()); + } + if (!preState.equals(HEALTH_STATE.DOWN)) { + logger.info("[setDown] {}", this); + notifyObservers(new InstanceDown(instance)); + } + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java new file mode 100644 index 000000000..ca6307946 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java @@ -0,0 +1,116 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.api.factory.ObjectFactory; +import com.ctrip.xpipe.api.observer.Observable; +import com.ctrip.xpipe.api.observer.Observer; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.AbstractInstanceEvent; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.HealthEventProcessor; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector; +import com.ctrip.xpipe.utils.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; +import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; + +@Component +public class DefaultPsubPingActionCollector extends AbstractPsubPingActionCollector implements PsubPingActionCollector, HealthStateService, OneWaySupport { + + private static final Logger logger = LoggerFactory.getLogger(DefaultPsubPingActionCollector.class); + + @Autowired + private List healthEventProcessors; + + @Resource(name = SCHEDULED_EXECUTOR) + private ScheduledExecutorService scheduled; + + @Resource(name = GLOBAL_EXECUTOR) + private ExecutorService executors; + + @Override + public HEALTH_STATE getHealthState(HostPort hostPort) { + RedisHealthCheckInstance key = allHealthStatus.keySet().stream() + .filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort)) + .findFirst().orElse(null); + + if (null != key) return allHealthStatus.get(key).getState(); + return HEALTH_STATE.UNKNOWN; + } + + @Override + public Map getAllCachedState() { + Map cachedHealthStatus = new HashMap<>(); + allHealthStatus.forEach(((instance, healthStatus) -> { + RedisInstanceInfo info = instance.getCheckInfo(); + cachedHealthStatus.put(info.getHostPort(), healthStatus.getState()); + })); + + return cachedHealthStatus; + } + + public Map getAllHealthStatus() { + Map cachedHealthStatus = new HashMap<>(); + allHealthStatus.forEach(((instance, healthStatus) -> { + HostPort hostPort = instance.getCheckInfo().getHostPort(); + cachedHealthStatus.put(hostPort, new HealthStatusDesc(hostPort, healthStatus)); + })); + + return cachedHealthStatus; + } + + @Override + public void updateHealthState(Map redisStates) { + throw new UnsupportedOperationException(); + } + + @Override + protected HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance) { + return MapUtils.getOrCreate(allHealthStatus, instance, new ObjectFactory() { + @Override + public HealthStatus create() { + + HealthStatus healthStatus = new CrossRegionRedisHealthStatus(instance, scheduled); + + healthStatus.addObserver(new Observer() { + @Override + public void update(Object args, Observable observable) { + onInstanceStateChange(args); + } + }); + healthStatus.start(); + return healthStatus; + } + }); + } + + private void onInstanceStateChange(Object args) { + + logger.info("[onInstanceStateChange]{}", args); + for (HealthEventProcessor processor : healthEventProcessors) { + + if (processor instanceof OneWaySupport) { + executors.execute(new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + processor.onEvent((AbstractInstanceEvent) args); + } + }); + } + } + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java index 7ad76d41b..be0421bfa 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java @@ -31,10 +31,10 @@ public class HealthStatus extends AbstractObservable implements Startable, Stopp public static long UNSET_TIME = -1L; - private AtomicLong lastPongTime = new AtomicLong(UNSET_TIME); + protected AtomicLong lastPongTime = new AtomicLong(UNSET_TIME); private AtomicLong lastHealthDelayTime = new AtomicLong(UNSET_TIME); - private AtomicReference state = new AtomicReference<>(HEALTH_STATE.UNKNOWN); + protected AtomicReference state = new AtomicReference<>(HEALTH_STATE.UNKNOWN); protected RedisHealthCheckInstance instance; protected final IntSupplier delayDownAfterMilli; @@ -110,7 +110,7 @@ protected boolean shouldNotRun() { return lastHealthDelayTime.get() < 0 && lastPongTime.get() < 0; } - void loading() { + protected void loading() { HEALTH_STATE preState = state.get(); if(preState.equals(preState.afterPingFail())) { return; @@ -124,7 +124,7 @@ void loading() { } } - void pong(){ + protected void pong(){ lastPongTime.set(System.currentTimeMillis()); setPingUp(); } @@ -135,6 +135,8 @@ void pongInit() { } } + protected void subSuccess(){} + void delay(long delayMilli, long...srcShardDbId){ //first time diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java new file mode 100644 index 000000000..b52d43434 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java @@ -0,0 +1,205 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.api.lifecycle.Ordered; +import com.ctrip.xpipe.api.lifecycle.TopElement; +import com.ctrip.xpipe.api.monitor.Task; +import com.ctrip.xpipe.api.monitor.TransactionMonitor; +import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.lifecycle.AbstractLifecycle; +import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.OutClientInstanceHealthHolder; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.UpDownInstances; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; +import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder; +import com.ctrip.xpipe.redis.core.entity.ClusterMeta; +import com.ctrip.xpipe.redis.core.entity.DcMeta; +import com.ctrip.xpipe.redis.core.entity.ShardMeta; +import com.ctrip.xpipe.redis.core.entity.XpipeMeta; +import com.ctrip.xpipe.redis.core.exception.MasterNotFoundException; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import com.ctrip.xpipe.tuple.Pair; +import com.ctrip.xpipe.utils.MapUtils; +import com.ctrip.xpipe.utils.XpipeThreadFactory; +import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.lang.Nullable; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; + +@Service +public class InstanceCrossRegionHealthStatusConsistenceInspector extends AbstractLifecycle implements TopElement { + private InstanceHealthStatusCollector collector; + + private InstanceStatusAdjuster adjuster; + + private StabilityHolder siteStability; + + private CheckerConfig config; + + private MetaCache metaCache; + + private GroupCheckerLeaderElector leaderElector; + + private DynamicDelayPeriodTask task; + + private ScheduledExecutorService scheduled; + + private static final Logger logger = LoggerFactory.getLogger(InstanceCrossRegionHealthStatusConsistenceInspector.class); + + private static final String currentDc = FoundationService.DEFAULT.getDataCenter(); + + private static final String TYPE = "HealthCheck"; + + @Autowired + public InstanceCrossRegionHealthStatusConsistenceInspector(InstanceHealthStatusCollector instanceHealthStatusCollector, + InstanceStatusAdjuster instanceStatusAdjuster, + @Nullable GroupCheckerLeaderElector groupCheckerLeaderElector, + StabilityHolder stabilityHolder, CheckerConfig checkerConfig, + MetaCache metaCache) { + this.collector = instanceHealthStatusCollector; + this.adjuster = instanceStatusAdjuster; + this.leaderElector = groupCheckerLeaderElector; + this.siteStability = stabilityHolder; + this.config = checkerConfig; + this.metaCache = metaCache; + } + + protected void inspectCurrentDc() { + if(!siteStability.isSiteStable()) { + logger.debug("[inspectCrossRegion][skip] unstable"); + return; + } + if (null == leaderElector || !leaderElector.amILeader()) { + logger.debug("[inspectCrossRegion][skip] not leader"); + return; + } + + logger.debug("[inspectCrossRegion] begin"); + final long timeoutMill = System.currentTimeMillis() + Math.min(config.getPingDownAfterMilli() / 2, + config.getDownAfterCheckNums() * config.getRedisReplicationHealthCheckInterval() / 2); + TransactionMonitor.DEFAULT.logTransactionSwallowException(TYPE, "compensator.inspect.crossRegion", new Task() { + @Override + public void go() throws Exception { + final long timeoutMill = System.currentTimeMillis() + 1000L * 3600; + Map> interestedCurrentDc = fetchInterestedCurrentDcClusterInstances(); + if (interestedCurrentDc.isEmpty()) { + logger.debug("[inspectCrossRegion][skip] no interested instance"); + return; + } + + Pair instanceHealth = collector.collect(true); + checkTimeout(timeoutMill, "after collect"); + + XPipeInstanceHealthHolder xpipeInstanceHealth = instanceHealth.getKey(); + OutClientInstanceHealthHolder outClientInstanceHealth = instanceHealth.getValue(); + UpDownInstances hostPortNeedAdjustForPingAction = findHostPortNeedAdjust(xpipeInstanceHealth, outClientInstanceHealth, interestedCurrentDc); + + checkTimeout(timeoutMill, "after compare"); + if (!hostPortNeedAdjustForPingAction.getHealthyInstances().isEmpty()) + adjuster.adjustInstances(hostPortNeedAdjustForPingAction.getHealthyInstances(), true, timeoutMill); + + checkTimeout(timeoutMill, "after adjust up"); + if (!hostPortNeedAdjustForPingAction.getUnhealthyInstances().isEmpty()) + adjuster.adjustInstances(hostPortNeedAdjustForPingAction.getUnhealthyInstances(), false, timeoutMill); + } + + @Override + public Map getData() { + return Collections.singletonMap("timeoutMilli", timeoutMill); + } + }); + } + + private void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { + if (System.currentTimeMillis() > timeoutAtMilli) { + logger.info("[timeout] {}", msg); + throw new TimeoutException(msg); + } + } + + protected Map> fetchInterestedCurrentDcClusterInstances() { + Map> interestedCurrentDcClusterInstances = new HashMap<>(); + XpipeMeta xpipeMeta = metaCache.getXpipeMeta(); + if (null == xpipeMeta) return interestedCurrentDcClusterInstances; + + for (DcMeta dcMeta: xpipeMeta.getDcs().values()) { + if (!dcMeta.getId().equalsIgnoreCase(currentDc)) continue; + + for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { + if (!ClusterType.isSameClusterType(clusterMeta.getType(), ClusterType.ONE_WAY)) continue; + if (!metaCache.isCrossRegion(dcMeta.getId(), clusterMeta.getActiveDc())) continue; + if (!dcMeta.getId().equalsIgnoreCase(currentDc)) continue; + + Set interestedInstances = MapUtils.getOrCreate(interestedCurrentDcClusterInstances, clusterMeta.getId(), HashSet::new); + for (ShardMeta shardMeta: clusterMeta.getShards().values()) { + shardMeta.getRedises().forEach(redis -> interestedInstances.add(new HostPort(redis.getIp(), redis.getPort()))); + } + } + } + + return interestedCurrentDcClusterInstances; + } + + protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipeInstanceHealthHolder, + OutClientInstanceHealthHolder outClientInstanceHealthHolder, + Map> interested) { + int quorum = config.getQuorum(); + UpDownInstances xpipeInstances = xpipeInstanceHealthHolder.aggregate(interested, quorum); + UpDownInstances outClientInstances = outClientInstanceHealthHolder.extractReadable(interested); + + Set needMarkUpInstances = xpipeInstances.getHealthyInstances(); + Set needMarkDownInstances = xpipeInstances.getUnhealthyInstances(); + needMarkUpInstances.retainAll(outClientInstances.getUnhealthyInstances()); + needMarkDownInstances.retainAll(outClientInstances.getHealthyInstances()); + logger.info("[InstanceCrossRegionHealthStatusConsistenceInspector] needMarkUpInstances:{}", needMarkUpInstances); + logger.info("[InstanceCrossRegionHealthStatusConsistenceInspector] needMarkDownInstances:{}", needMarkDownInstances); + return new UpDownInstances(needMarkUpInstances, needMarkDownInstances); + } + + @Override + protected void doInitialize() throws Exception { + super.doInitialize(); + this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("InstanceCrossRegionHealthStatusConsistenceInspector")); + this.task = new DynamicDelayPeriodTask("inspectCrossRegion", new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + inspectCurrentDc(); + } + }, config::getHealthMarkCompensateIntervalMill, scheduled); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + this.task.start(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + this.task.stop(); + } + + @Override + protected void doDispose() throws Exception { + super.doDispose(); + this.scheduled.shutdown(); + this.scheduled = null; + this.task = null; + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java index b401c881a..bfed17043 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java @@ -48,7 +48,11 @@ public InstanceHealthStatusCollector(RemoteCheckerManager remoteCheckerManager, this.executors = executors; } - public Pair collect() + public Pair collect() throws ExecutionException, InterruptedException, TimeoutException { + return collect(false); + } + + public Pair collect(boolean isCrossRegion) throws InterruptedException, ExecutionException, TimeoutException { XPipeInstanceHealthHolder xpipeInstanceHealthHolder = new XPipeInstanceHealthHolder(); OutClientInstanceHealthHolder outClientInstanceHealthHolder = new OutClientInstanceHealthHolder(); @@ -56,7 +60,7 @@ public Pair collect() ParallelCommandChain commandChain = new ParallelCommandChain(executors); commandChain.add(new GetAllOutClientInstanceStatusCmd(outClientInstanceHealthHolder)); remoteCheckerManager.getAllCheckerServices().forEach(checkerService -> { - commandChain.add(new GetRemoteCheckResultCmd(checkerService, xpipeInstanceHealthHolder)); + commandChain.add(new GetRemoteCheckResultCmd(checkerService, xpipeInstanceHealthHolder, isCrossRegion)); }); commandChain.execute().get(5, TimeUnit.SECONDS); @@ -81,15 +85,18 @@ private class GetRemoteCheckResultCmd extends AbstractCommand { private XPipeInstanceHealthHolder resultHolder; - public GetRemoteCheckResultCmd(CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder) { + private boolean isCrossRegion; + + public GetRemoteCheckResultCmd(CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder, boolean isCrossRegion) { this.checkerService = checkerService; this.resultHolder = xpipeInstanceHealthHolder; + this.isCrossRegion = isCrossRegion; } @Override protected void doExecute() throws Throwable { try { - resultHolder.add(checkerService.getAllInstanceHealthStatus()); + resultHolder.add(checkerService.getAllInstanceHealthStatus(isCrossRegion)); } catch (RestClientException restException) { logger.info("[doExecute][rest fail] {}", restException.getMessage()); } catch (Throwable th) { @@ -122,7 +129,7 @@ public GetAllOutClientInstanceStatusCmd(OutClientInstanceHealthHolder outClientI protected void doExecute() throws Throwable { try { resultHolder.addClusters( - outerClientCache.getAllActiveDcClusters(FoundationService.DEFAULT.getDataCenter())); + outerClientCache.getAllDcClusters(FoundationService.DEFAULT.getDataCenter())); } catch (Throwable th) { logger.info("[doExecute][fail]", th); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java index 1dfa71b06..a491a689e 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java @@ -49,17 +49,17 @@ @Service public class InstanceHealthStatusConsistenceInspector extends AbstractLifecycle implements TopElement { - private InstanceHealthStatusCollector collector; + protected InstanceHealthStatusCollector collector; - private InstanceStatusAdjuster adjuster; + protected InstanceStatusAdjuster adjuster; - private StabilityHolder siteStability; + protected StabilityHolder siteStability; - private CheckerConfig config; + protected CheckerConfig config; - private MetaCache metaCache; + protected MetaCache metaCache; - private GroupCheckerLeaderElector leaderElector; + protected GroupCheckerLeaderElector leaderElector; private DynamicDelayPeriodTask task; @@ -109,6 +109,7 @@ protected void inspect() { TransactionMonitor.DEFAULT.logTransactionSwallowException(TYPE, "compensator.inspect", new Task() { @Override public void go() throws Exception { + final long timeoutMill = System.currentTimeMillis() + 1000L * 3600; Map> interested = fetchInterestedClusterInstances(); if (interested.isEmpty()) { logger.debug("[inspect][skip] no interested instance"); @@ -138,7 +139,7 @@ public Map getData() { }); } - private void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { + protected void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { if (System.currentTimeMillis() > timeoutAtMilli) { logger.info("[timeout] {}", msg); throw new TimeoutException(msg); @@ -157,6 +158,7 @@ protected Map> fetchInterestedClusterInstances() { for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { if (!ClusterType.isSameClusterType(clusterMeta.getType(), ClusterType.ONE_WAY)) continue; + if (metaCache.isCrossRegion(dcMeta.getId(), clusterMeta.getActiveDc())) continue;; if (!clusterMeta.getActiveDc().equalsIgnoreCase(currentDc)) continue; Set interestedInstances = MapUtils.getOrCreate(interestedClusterInstances, clusterMeta.getId(), HashSet::new); @@ -181,7 +183,8 @@ protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipe needMarkUpInstances.retainAll(outClientInstances.getUnhealthyInstances()); needMarkDownInstances.retainAll(outClientInstances.getHealthyInstances()); needMarkDownInstances = filterMasterHealthyInstances(xpipeInstanceHealthHolder, needMarkDownInstances, quorum); - needMarkDownInstances = filterMarkDowUnsupportedInstances(needMarkDownInstances); + logger.info("[InstanceHealthStatusConsistenceInspector] needMarkUpInstances:{}", needMarkUpInstances); + logger.info("[InstanceHealthStatusConsistenceInspector] needMarkDownInstances:{}", needMarkDownInstances); return new UpDownInstances(needMarkUpInstances, needMarkDownInstances); } @@ -212,33 +215,6 @@ protected Set filterMasterHealthyInstances(XPipeInstanceHealthHolder x return masterHealthyInstances; } - protected Set filterMarkDowUnsupportedInstances(Set instances) { - Set wontMarkdownInstances = new HashSet<>(); - for (HostPort instance : instances) { - HEALTH_STATE healthState = defaultDelayPingActionCollector.getState(instance); - if (healthState.equals(HEALTH_STATE.SICK)) { - if (!shouldMarkdownDcClusterSickInstances(healthCheckInstanceManager.findRedisHealthCheckInstance(instance))) - wontMarkdownInstances.add(instance); - } - } - instances.removeAll(wontMarkdownInstances); - return instances; - } - - boolean shouldMarkdownDcClusterSickInstances(RedisHealthCheckInstance healthCheckInstance) { - RedisInstanceInfo info = healthCheckInstance.getCheckInfo(); - if (info.isCrossRegion()) { - logger.info("[markdown][{} is cross region, do not call client service ]", info.getHostPort()); - return false; - } - if (healthCheckInstance.getHealthCheckConfig().getDelayConfig(info.getClusterId(), currentDc, info.getDcId()).getClusterLevelHealthyDelayMilli() < 0) { - logger.info("[markdown][cluster {} dcs {}->{} distance is -1, do not call client service ]", info.getClusterId(), currentDc, info.getDcId()); - return false; - } - return true; - } - - @Override protected void doInitialize() throws Exception { super.doInitialize(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java index 0d6942e4c..b61907c86 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.handler; +import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.concurrent.FinalStateSetterManager; import com.ctrip.xpipe.endpoint.ClusterShardHostPort; import com.ctrip.xpipe.endpoint.HostPort; @@ -48,6 +49,8 @@ public abstract class AbstractHealthEventHandler listeners; @@ -45,17 +51,25 @@ public class PingActionFactory implements RedisHealthCheckActionFactory delayPingCollectors; + @Autowired + private List psubPingActionCollectors; + private Map> controllersByClusterType; private Map> listenerByClusterType; private Map> delayPingCollectorsByClusterType; + private Map> psubPingActionCollectorsByClusterType; + + protected static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); + @PostConstruct public void postConstruct() { controllersByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(controllers); listenerByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(listeners); delayPingCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(delayPingCollectors); + psubPingActionCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(psubPingActionCollectors); } @Override @@ -63,15 +77,23 @@ public PingAction create(RedisHealthCheckInstance instance) { PingAction pingAction = new PingAction(scheduled, instance, executors); ClusterType clusterType = instance.getCheckInfo().getClusterType(); - pingAction.addListeners(listenerByClusterType.get(clusterType)); pingAction.addControllers(controllersByClusterType.get(clusterType)); - if(instance instanceof DefaultRedisHealthCheckInstance) { - pingAction.addListener(((DefaultRedisHealthCheckInstance)instance).createPingListener()); + + if (currentDcId.equalsIgnoreCase(instance.getCheckInfo().getActiveDc())) { + pingAction.addListeners(listenerByClusterType.get(clusterType)); + if(instance instanceof DefaultRedisHealthCheckInstance) { + pingAction.addListener(((DefaultRedisHealthCheckInstance)instance).createPingListener()); + } + delayPingCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); + }); } - delayPingCollectorsByClusterType.get(clusterType).forEach(collector -> { - if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); - }); + if (metaCache.isCrossRegion(currentDcId, instance.getCheckInfo().getActiveDc())) { + psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); + }); + } return pingAction; } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java new file mode 100644 index 000000000..e5d32397a --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java @@ -0,0 +1,24 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class OneWayPsubActionController implements PsubActionController, OneWaySupport { + + @Autowired + private MetaCache metaCache; + + private static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); + + @Override + public boolean shouldCheck(RedisHealthCheckInstance instance) { + RedisInstanceInfo info = instance.getCheckInfo(); + return metaCache.isCrossRegion(currentDcId, info.getActiveDc()) && currentDcId.equalsIgnoreCase(info.getDcId()); + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java new file mode 100644 index 000000000..7bf07be14 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java @@ -0,0 +1,50 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.AbstractHealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +public class PsubAction extends AbstractHealthCheckAction { + + protected static final Logger logger = LoggerFactory.getLogger(PsubAction.class); + + private String[] pubSubChannelPrefix; + + public PsubAction(ScheduledExecutorService scheduled, RedisHealthCheckInstance instance, ExecutorService executors) { + super(scheduled, instance, executors); + this.pubSubChannelPrefix = new String[]{"xpipe*"}; + } + + @Override + protected void doTask() { + RedisSession session = instance.getRedisSession(); + doPSubscribe(session, new RedisSession.SubscribeCallback() { + @Override + public void message(String channel, String message) { + logger.debug("[PsubAction][{}]success, channel:{}, message : {}", instance.getEndpoint(), channel, message); + notifyListeners(new PsubActionContext(instance, message)); + } + + @Override + public void fail(Throwable e) { + logger.error("[PsubAction][{}] fail", instance.getEndpoint(), e); + //ignore psub fail + } + }, pubSubChannelPrefix); + } + + @Override + protected Logger getHealthCheckLogger() { + return logger; + } + + protected void doPSubscribe(RedisSession session, RedisSession.SubscribeCallback callback, String... channel) { + session.psubscribeIfAbsent(callback, channel); + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java new file mode 100644 index 000000000..017aa6d4d --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java @@ -0,0 +1,10 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.AbstractActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public class PsubActionContext extends AbstractActionContext { + public PsubActionContext(RedisHealthCheckInstance instance, String s) { + super(instance, s); + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java new file mode 100644 index 000000000..c5e38e723 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java @@ -0,0 +1,7 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckActionController; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public interface PsubActionController extends HealthCheckActionController { +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java new file mode 100644 index 000000000..891491ae0 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java @@ -0,0 +1,71 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckActionFactory; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; +import com.ctrip.xpipe.redis.checker.healthcheck.util.ClusterTypeSupporterSeparator; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static com.ctrip.xpipe.redis.checker.resource.Resource.PING_DELAY_INFO_EXECUTORS; +import static com.ctrip.xpipe.redis.checker.resource.Resource.PING_DELAY_INFO_SCHEDULED; + +@Component +public class PsubActionFactory implements RedisHealthCheckActionFactory, OneWaySupport { + + @Autowired + private MetaCache metaCache; + + @Resource(name = PING_DELAY_INFO_SCHEDULED) + private ScheduledExecutorService scheduled; + + @Resource(name = PING_DELAY_INFO_EXECUTORS) + private ExecutorService executors; + + @Autowired + private List controllers; + + @Autowired + private List psubPingActionCollectors; + + private Map> controllersByClusterType; + + private Map> psubPingActionCollectorsByClusterType; + + private static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); + + @PostConstruct + public void postConstruct() { + controllersByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(controllers); + psubPingActionCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(psubPingActionCollectors); + } + + @Override + public PsubAction create(RedisHealthCheckInstance instance) { + PsubAction psubAction = new PsubAction(scheduled, instance, executors); + ClusterType clusterType = instance.getCheckInfo().getClusterType(); + + psubAction.addControllers(controllersByClusterType.get(clusterType)); + psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) psubAction.addListener(collector.createPsubActionListener()); + }); + return psubAction; + } + + @Override + public boolean supportInstnace(RedisHealthCheckInstance instance) { + RedisInstanceInfo info = instance.getCheckInfo(); + return metaCache.isCrossRegion(currentDcId, info.getActiveDc()) && currentDcId.equalsIgnoreCase(info.getDcId()); + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java new file mode 100644 index 000000000..59a19d494 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java @@ -0,0 +1,15 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public interface PsubActionListener extends HealthCheckActionListener> { + + @Override + default boolean worksfor(ActionContext t) { + return t instanceof PsubActionContext; + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java new file mode 100644 index 000000000..17cf8a6e6 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java @@ -0,0 +1,14 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener; + +public interface PsubPingActionCollector { + + boolean supportInstance(RedisHealthCheckInstance instance); + + PingActionListener createPingActionListener(); + + PsubActionListener createPsubActionListener(); + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java index 9bfe46e78..53b8c2ee2 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.KeeperInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.actions.delay.DelayConfig; +import com.ctrip.xpipe.redis.core.meta.MetaCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,9 +21,9 @@ public class CompositeHealthCheckConfig implements HealthCheckConfig { private HealthCheckConfig config; - public CompositeHealthCheckConfig(RedisInstanceInfo instanceInfo, CheckerConfig checkerConfig, DcRelationsService dcRelationsService) { + public CompositeHealthCheckConfig(RedisInstanceInfo instanceInfo, CheckerConfig checkerConfig, DcRelationsService dcRelationsService, boolean isCrossRegion) { logger.info("[CompositeHealthCheckConfig] {}", instanceInfo); - if(instanceInfo.isCrossRegion()) { + if(isCrossRegion) { config = new ProxyEnabledHealthCheckConfig(checkerConfig, dcRelationsService); logger.info("[CompositeHealthCheckConfig][proxied] ping down time: {}", config.pingDownAfterMilli()); } else { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java index 665b0270a..d5022fb3e 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java @@ -10,6 +10,9 @@ import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.*; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionFactory; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubAction; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionFactory; import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.RedisCheckRule; import com.ctrip.xpipe.redis.checker.healthcheck.config.CompositeHealthCheckConfig; import com.ctrip.xpipe.redis.checker.healthcheck.config.DefaultHealthCheckConfig; @@ -124,7 +127,7 @@ public RedisHealthCheckInstance create(RedisMeta redisMeta) { RedisInstanceInfo info = createRedisInstanceInfo(redisMeta); Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redisMeta); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); instance.setEndpoint(endpoint) .setSession(redisSessionManager.findOrCreateSession(endpoint)) @@ -228,7 +231,7 @@ public RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta r DefaultRedisHealthCheckInstance instance = new DefaultRedisHealthCheckInstance(); RedisInstanceInfo info = createRedisInstanceInfo(redis); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redis); instance.setEndpoint(endpoint) @@ -242,6 +245,25 @@ public RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta r return instance; } + @Override + public RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis) { + DefaultRedisHealthCheckInstance instance = new DefaultRedisHealthCheckInstance(); + + RedisInstanceInfo info = createRedisInstanceInfo(redis); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); + Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redis); + + instance.setEndpoint(endpoint) + .setSession(redisSessionManager.findOrCreateSession(endpoint)) + .setInstanceInfo(info) + .setHealthCheckConfig(config); + + initActionsForRedisForPsubPingAction(instance); + startCheck(instance); + + return instance; + } + private void initActionsForRedisForAssignedAction(DefaultRedisHealthCheckInstance instance) { for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { if (factory instanceof KeeperSupport) @@ -249,6 +271,13 @@ private void initActionsForRedisForAssignedAction(DefaultRedisHealthCheckInstanc } } + private void initActionsForRedisForPsubPingAction(DefaultRedisHealthCheckInstance instance) { + for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { + if (factory instanceof PingActionFactory || factory instanceof PsubActionFactory) + initActions(instance, factory); + } + } + @SuppressWarnings("unchecked") private void initActions(DefaultRedisHealthCheckInstance instance) { for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java index 390ecbc75..9cfb58e89 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java @@ -35,6 +35,8 @@ public class DefaultHealthCheckInstanceManager implements HealthCheckInstanceMan private ConcurrentMap redisInstanceForAssignedAction = Maps.newConcurrentMap(); + private ConcurrentMap redisInstanceForPingAction = Maps.newConcurrentMap(); + @Autowired private HealthCheckInstanceFactory instanceFactory; @@ -61,6 +63,18 @@ public RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisM return null; } + @Override + public RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis) { + try { + HostPort key = new HostPort(redis.getIp(), redis.getPort()); + return MapUtils.getOrCreate(redisInstanceForPingAction, key, + () -> instanceFactory.getOrCreateRedisInstanceForPsubPingAction(redis)); + } catch (Throwable th) { + logger.error("getOrCreate ping action health check redis instance:{}:{}", redis.getIp(), redis.getPort()); + } + return null; + } + @Override public KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper) { try { @@ -93,6 +107,11 @@ public RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort host return redisInstanceForAssignedAction.get(hostPort); } + @Override + public RedisHealthCheckInstance findRedisInstanceForPsubPingAction(HostPort hostPort) { + return redisInstanceForPingAction.get(hostPort); + } + @Override public KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort) { return keeperInstances.get(hostPort); @@ -125,6 +144,13 @@ public RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort ho return instance; } + @Override + public RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort) { + RedisHealthCheckInstance instance = redisInstanceForPingAction.remove(hostPort); + if (null != instance) instanceFactory.remove(instance); + return instance; + } + @Override public ClusterHealthCheckInstance remove(String cluster) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java index 4f0b23adb..ea8f4a7b3 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.core.meta.comparator.KeeperContainerMetaComparator.getAllKeeperContainerDetailInfoFromDcMeta; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @@ -146,6 +145,10 @@ void generateHealthCheckInstances() { generateHealthCheckInstances(cluster); } + if (clusterType == ClusterType.ONE_WAY && isClusterActiveDcCrossRegion(cluster) && clusterDcIsCurrentDc(cluster)) { + generatePsubPingActionHealthCheckInstances(cluster); + } + } } } @@ -168,6 +171,14 @@ void generateHealthCheckInstances(ClusterMeta clusterMeta){ instanceManager.getOrCreate(clusterMeta); } + void generatePsubPingActionHealthCheckInstances(ClusterMeta clusterMeta){ + for(ShardMeta shard : clusterMeta.getShards().values()) { + for(RedisMeta redis : shard.getRedises()) { + instanceManager.getOrCreateRedisInstanceForPsubPingAction(redis); + } + } + } + private boolean isClusterActiveIdcCurrentIdc(ClusterMeta cluster) { return cluster.getActiveDc().equalsIgnoreCase(currentDcId); @@ -206,4 +217,8 @@ private boolean hasMultipleActiveDcs(ClusterType clusterType) { return clusterType.supportMultiActiveDC() && !clusterType.isCrossDc(); } + private boolean isClusterActiveDcCrossRegion(ClusterMeta clusterMeta) { + return metaCache.isCrossRegion(currentDcId, clusterMeta.getActiveDc()); + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java index 0b82cb610..4a89b794e 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java @@ -27,4 +27,6 @@ public interface HealthCheckInstanceFactory { void remove(ClusterHealthCheckInstance instance); RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta redis); + + RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java index 796505132..9853f7c22 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java @@ -9,6 +9,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager; import com.ctrip.xpipe.redis.checker.healthcheck.impl.HealthCheckEndpointFactory; import com.ctrip.xpipe.redis.core.entity.*; +import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.redis.core.meta.MetaComparator; import com.ctrip.xpipe.redis.core.meta.MetaComparatorVisitor; import com.ctrip.xpipe.redis.core.meta.comparator.ClusterMetaComparator; @@ -48,6 +49,8 @@ public class DefaultDcMetaChangeManager extends AbstractStartStoppable implement private CheckerConfig checkerConfig; + private MetaCache metaCache; + private final String dcId; private final List clustersToDelete = new ArrayList<>(); @@ -58,12 +61,14 @@ public class DefaultDcMetaChangeManager extends AbstractStartStoppable implement public DefaultDcMetaChangeManager(String dcId, HealthCheckInstanceManager instanceManager, HealthCheckEndpointFactory healthCheckEndpointFactory, CheckerConsoleService checkerConsoleService, - CheckerConfig checkerConfig) { + CheckerConfig checkerConfig, + MetaCache metaCache) { this.dcId = dcId; this.instanceManager = instanceManager; this.healthCheckEndpointFactory = healthCheckEndpointFactory; this.checkerConsoleService = checkerConsoleService; this.checkerConfig = checkerConfig; + this.metaCache = metaCache; } @Override @@ -104,10 +109,12 @@ public void compare(DcMeta future, DcMeta allFutureDcMeta) { private void removeAndAdd() { this.redisListToDelete.forEach(this::removeRedis); + this.redisListToDelete.forEach(this::removeRedisOnlyForPingAction); this.clustersToDelete.forEach(this::removeCluster); this.clustersToAdd.forEach(this::addCluster); this.redisListToAdd.forEach(this::addRedis); + this.redisListToAdd.forEach(this::addRedisOnlyForPingAction); } private void clearUp() { @@ -125,19 +132,24 @@ private void removeCluster(ClusterMeta removed) { logger.info("[removeCluster][{}][{}] remove health check", dcId, removed.getId()); ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(removeConsumer))); + ClusterMetaVisitor clusterMetaPingActionVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(removePingActionConsumer))); clusterMetaVisitor.accept(removed); + clusterMetaPingActionVisitor.accept(removed); } private void addCluster(ClusterMeta added) { - if (!isInterestedInCluster(added)) { - logger.info("[addCluster][{}][skip] cluster not interested", added.getId()); - return; + if (isInterestedInCluster(added)) { + logger.info("[addCluster][{}][{}] add health check", dcId, added.getId()); + instanceManager.getOrCreate(added); + ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addConsumer))); + clusterMetaVisitor.accept(added); + } + + if (isOneWayClusterActiveDcCrossRegionAndCurrentDc(added)) { + ClusterMetaVisitor clusterMetaPingActionVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addPingActionConsumer))); + clusterMetaPingActionVisitor.accept(added); } - logger.info("[addCluster][{}][{}] add health check", dcId, added.getId()); - instanceManager.getOrCreate(added); - ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addConsumer))); - clusterMetaVisitor.accept(added); } private void removeRedis(RedisMeta removed) { @@ -199,6 +211,11 @@ protected boolean isInterestedInCluster(ClusterMeta cluster) { return result; } + private boolean isOneWayClusterActiveDcCrossRegionAndCurrentDc(ClusterMeta cluster) { + ClusterType clusterType = ClusterType.lookup(cluster.getType()); + return clusterType == ClusterType.ONE_WAY && isClusterActiveDcCrossRegion(cluster) && clusterDcIsCurrentDc(cluster); + } + private boolean clusterDcIsCurrentDc(ClusterMeta clusterMeta) { return clusterMeta.parent().getId().equalsIgnoreCase(currentDcId); } @@ -220,6 +237,10 @@ private boolean hasMultipleActiveDcs(ClusterType clusterType) { return clusterType.supportMultiActiveDC() && !clusterType.isCrossDc(); } + private boolean isClusterActiveDcCrossRegion(ClusterMeta clusterMeta) { + return metaCache.isCrossRegion(currentDcId, clusterMeta.getActiveDc()); + } + private Consumer removeConsumer = new Consumer() { @Override public void accept(RedisMeta redisMeta) { @@ -234,6 +255,20 @@ public void accept(RedisMeta redisMeta) { } }; + private Consumer removePingActionConsumer = new Consumer() { + @Override + public void accept(RedisMeta redisMeta) { + removeRedisOnlyForPingAction(redisMeta); + } + }; + + private Consumer addPingActionConsumer = new Consumer() { + @Override + public void accept(RedisMeta redisMeta) { + addRedisOnlyForPingAction(redisMeta); + } + }; + @Override protected void doStart() { if (current == null) { @@ -276,6 +311,20 @@ private void addRedisOnlyForUsedMemory(RedisMeta added) { instanceManager.getOrCreateRedisInstanceForAssignedAction(added); } + private void removeRedisOnlyForPingAction(RedisMeta removed) { + if (null != instanceManager.removeRedisInstanceForPingAction(new HostPort(removed.getIp(), removed.getPort()))) { + logger.info("[removeRedisOnlyForPingAction][{}:{}] {}", removed.getIp(), removed.getPort(), removed); + } + } + + private void addRedisOnlyForPingAction(RedisMeta added) { + if (!isOneWayClusterActiveDcCrossRegionAndCurrentDc(added.parent().parent())) { + return; + } + logger.info("[addRedisOnlyForPingAction][{}:{}] {}", added.getIp(), added.getPort(), added); + instanceManager.getOrCreateRedisInstanceForPsubPingAction(added); + } + private class KeeperContainerMetaComparatorVisitor implements MetaComparatorVisitor { @Override diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java index 82a349e89..82655e604 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java @@ -97,7 +97,7 @@ public DcMetaChangeManager getOrCreate(String dcId) { @Override public DcMetaChangeManager create() { return new DefaultDcMetaChangeManager(dcId, instanceManager, healthCheckEndpointFactory, - checkerConsoleService, checkerConfig); + checkerConsoleService, checkerConfig, metaCache); } }); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java index 2cad344d3..df5e2c7f1 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java @@ -83,6 +83,8 @@ public void check() { Subscribe command = pubSubConnectionWrapper.command.get(); if (command instanceof CRDTSubscribeCommand) { crdtsubscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); + } else if (command instanceof PsubscribeCommand) { + psubscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); } else { subscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); } @@ -109,6 +111,10 @@ public synchronized void crdtsubscribeIfAbsent(SubscribeCallback callback, Strin subscribeIfAbsent(callback, () -> new CRDTSubscribeCommand(clientPool, scheduled, commandTimeOut, channel), channel); } + public synchronized void psubscribeIfAbsent(SubscribeCallback callback, String... channel) { + subscribeIfAbsent(callback, () -> new PsubscribeCommand(clientPool, scheduled, commandTimeOut, channel), channel); + } + private synchronized void subscribeIfAbsent(SubscribeCallback callback, Supplier subCommandSupplier, String... channel) { PubSubConnectionWrapper pubSubConnectionWrapper = subscribConns.get(Sets.newHashSet(channel)); if (pubSubConnectionWrapper == null || pubSubConnectionWrapper.shouldCreateNewSession()) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java index 667b35d47..a873df172 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java @@ -19,6 +19,8 @@ public class DefaultCheckerService extends AbstractService implements CheckerSer private static final String PATH_GET_ALL_INSTANCE_HEALTH_STATUS = "/api/health/check/status/all"; + private static final String PATH_GET_ALL_CROSS_REGION_INSTANCE_HEALTH_STATUS = "/api/health/check/cross/region/status/all"; + public DefaultCheckerService(String host) { if (host.startsWith("http://")) this.host = host; else this.host = "http://" + host; @@ -30,7 +32,10 @@ public HEALTH_STATE getInstanceStatus(String ip, int port) { } @Override - public Map getAllInstanceHealthStatus() { + public Map getAllInstanceHealthStatus(boolean isCrossRegion) { + if (isCrossRegion) { + return restTemplate.getForObject(host + PATH_GET_ALL_CROSS_REGION_INSTANCE_HEALTH_STATUS, AllInstanceHealthStatus.class); + } return restTemplate.getForObject(host + PATH_GET_ALL_INSTANCE_HEALTH_STATUS, AllInstanceHealthStatus.class); } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManager.java index aa86fd21a..6bc64c4bf 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManager.java @@ -58,7 +58,7 @@ public List> allInstanceHealthStatus() { checkerAddressList.forEach(checker -> { try { if (!remoteCheckers.containsKey(checker)) remoteCheckers.put(checker, new DefaultCheckerService(checker)); - Map allInstanceHealthStatus = remoteCheckers.get(checker).getAllInstanceHealthStatus(); + Map allInstanceHealthStatus = remoteCheckers.get(checker).getAllInstanceHealthStatus(false); result.add(allInstanceHealthStatus); } catch (Throwable th) { logger.info("[allInstanceHealthStatus][{}] fail", checker, th); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java index 5c02c2a1d..8dce340ce 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java @@ -9,6 +9,9 @@ import com.ctrip.xpipe.redis.checker.RedisDelayManager; import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderAware; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultPsubPingActionCollector; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStateService; import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingService; import com.ctrip.xpipe.redis.checker.model.CheckerRole; @@ -21,8 +24,12 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; /** * @author lishanglin @@ -30,7 +37,7 @@ */ public class HealthCheckReporter implements GroupCheckerLeaderAware { - private HealthStateService healthStateService; + private List healthStateServices; private RedisDelayManager redisDelayManager; @@ -60,11 +67,11 @@ public class HealthCheckReporter implements GroupCheckerLeaderAware { private static final Logger logger = LoggerFactory.getLogger(HealthCheckReporter.class); - public HealthCheckReporter(HealthStateService healthStateService, CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, + public HealthCheckReporter(List healthStateServices, CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, ClusterServer clusterServer, ClusterServer allCheckerServer, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, ClusterHealthManager clusterHealthManager, int serverPort) { - this.healthStateService = healthStateService; + this.healthStateServices = healthStateServices; this.serverPort = serverPort; this.config = checkerConfig; this.checkerConsoleService = checkerConsoleService; @@ -142,7 +149,7 @@ private void reportCheckResult() { result.encodeCrossMasterDelays(crossMasterDelayManager.getAllCrossMasterDelays()); result.encodeRedisAlives(pingService.getAllRedisAlives()); result.setWarningClusterShards(clusterHealthManager.getAllClusterWarningShards()); - result.encodeRedisStates(healthStateService.getAllCachedState()); + result.encodeRedisStates(getAllRedisStates()); result.setHeteroShardsDelay(redisDelayManager.getAllHeteroShardsDelays()); checkerConsoleService.report(config.getConsoleAddress(), result); @@ -151,4 +158,28 @@ private void reportCheckResult() { } } + private Map getAllRedisStates() { + Map redisStates = new HashMap<>(); + DefaultDelayPingActionCollector delayPingActionCollector = null; + DefaultPsubPingActionCollector psubPingActionCollector = null; + for (HealthStateService service : healthStateServices) { + if (service instanceof DefaultDelayPingActionCollector) { + delayPingActionCollector = (DefaultDelayPingActionCollector) service; + } + if (service instanceof DefaultPsubPingActionCollector) { + psubPingActionCollector = (DefaultPsubPingActionCollector) service; + } + } + if (delayPingActionCollector != null) { + redisStates.putAll(delayPingActionCollector.getAllCachedState()); + } + if (psubPingActionCollector != null) { + Map allCachedState = psubPingActionCollector.getAllCachedState(); + for (Map.Entry entry : allCachedState.entrySet()) { + redisStates.put(entry.getKey(), entry.getValue()); + } + } + return redisStates; + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java index 533156a23..1107944d3 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java @@ -198,10 +198,10 @@ public Map loadAllClusterCreateTime(String console) { } @Override - public Map loadAllActiveDcOneWayClusterInfo(String console, String activeDc) { + public Map loadAllDcOneWayClusterInfo(String console, String dc) { UriComponents comp = UriComponentsBuilder .fromHttpUrl(console + ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ACTIVE_DC_ONE_WAY_CLUSTERS) - .queryParam("activeDc", activeDc).build(); + .queryParam("activeDc", dc).build(); ResponseEntity> times = restTemplate.exchange( comp.toString(), HttpMethod.GET, null, clusterInfoMapTypeDef); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java index 7a85a0601..324645244 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java @@ -210,7 +210,7 @@ public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws } @Override - public Map getAllActiveDcClusters(String activeDc) { + public Map getAllDcClusters(String dc) { return null; } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java index b5827edee..1a9e166d6 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java @@ -6,7 +6,6 @@ import com.ctrip.xpipe.redis.checker.CheckerService; import com.ctrip.xpipe.redis.checker.OuterClientCache; import com.ctrip.xpipe.redis.checker.RemoteCheckerManager; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatus; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; @@ -58,10 +57,10 @@ public void setupInstanceHealthStatusCollectorTest() { @Test public void testCollect() throws Exception { Map healthStatus = mockHealthStatusMap(HEALTH_STATE.HEALTHY); - Mockito.when(remoteCheckerService.getAllInstanceHealthStatus()).thenReturn(healthStatus); - Mockito.when(outerClientCache.getAllActiveDcClusters("jq")).thenReturn(mockOutClientResp(true)); + Mockito.when(remoteCheckerService.getAllInstanceHealthStatus(false)).thenReturn(healthStatus); + Mockito.when(outerClientCache.getAllDcClusters("jq")).thenReturn(mockOutClientResp(true)); healthStatus = mockHealthStatusMap(HEALTH_STATE.HEALTHY); - Mockito.when(localCheckerService.getAllInstanceHealthStatus()).thenReturn(healthStatus); + Mockito.when(localCheckerService.getAllInstanceHealthStatus(false)).thenReturn(healthStatus); Pair result = this.collector.collect(); UpDownInstances xpipeUpDownInstances = result.getKey().aggregate(interested, 2); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java index c11e1acd9..7e2167b69 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java @@ -84,7 +84,6 @@ public void setupInstanceHealthStatusConsistenceCheckerTest() throws Exception { Mockito.when(config.getRedisReplicationHealthCheckInterval()).thenReturn(2000); Mockito.when(config.getDownAfterCheckNums()).thenReturn(5); Mockito.when(siteStability.isSiteStable()).thenReturn(true); - Mockito.when(delayPingActionCollector.getState(any())).thenReturn(HEALTH_STATE.DOWN); } @Override diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java index f32334f43..fe73ce780 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java @@ -74,7 +74,7 @@ public void beforeDefaultDcMetaChangeManagerTest() throws IOException, SAXExcept when(checkerConfig.getConsoleAddress()).thenReturn("127.0.0.1"); when(checkerConsoleService.getXpipeDcAllMeta(Mockito.anyString(), Mockito.anyString())).thenReturn(getXpipeMeta()); - manager = new DefaultDcMetaChangeManager("oy", instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager("oy", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); } private void prepareData(String dc) { @@ -263,7 +263,7 @@ public void visitRemoved() { @Test public void visitRemovedClusterActiveDc(){ - manager = spy(new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig)); + manager = spy(new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache)); manager.compare(getDcMeta("jq"), null); DcMeta dcMeta = MetaCloneFacade.INSTANCE.clone(getDcMeta("jq")); @@ -359,7 +359,7 @@ public void testSwitchClusterShards() throws Exception { @Test public void testHeteroClusterModified() throws Exception { - DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig); + DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); DcMeta dcMeta= getDcMeta("jq"); ClusterMeta cluster1 = dcMeta.findCluster("cluster2"); cluster1.setBackupDcs("ali"); @@ -384,7 +384,7 @@ public void testHeteroClusterModified() throws Exception { @Test public void testBackupDcClusterModified() throws Exception { - DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig); + DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); DcMeta dcMeta= getDcMeta("jq"); ClusterMeta cluster1 = dcMeta.findCluster("cluster2"); cluster1.setBackupDcs("jq,ali"); @@ -491,7 +491,7 @@ public void visitModified1() { @Test public void testKeeperChange() throws Exception { String dcId = FoundationService.DEFAULT.getDataCenter(); - manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); prepareData(dcId); DcMeta future = cloneDcMeta(dcId); future.addKeeperContainer(new KeeperContainerMeta().setId(4L).setIp("1.1.1.4").setPort(8080)); @@ -512,7 +512,7 @@ public void testKeeperChange() throws Exception { @Test public void testRedisChange() throws Exception { String dcId = FoundationService.DEFAULT.getDataCenter(); - manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); prepareData(dcId); DcMeta future = cloneDcMeta(dcId); future.addKeeperContainer(new KeeperContainerMeta().setId(4L).setIp("1.1.1.4").setPort(8080)); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java index 9701ea130..d9e4aded1 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java @@ -54,6 +54,7 @@ public class DefaultConsoleDcCheckerService implements ConsoleDcCheckerService { @Override public List getShardAllCheckerGroupHealthCheck(String dcId, String clusterId, String shardId) { + List result = new ArrayList<>(); ClusterTbl clusterTbl = clusterService.find(clusterId); if (clusterTbl == null) { return null; @@ -62,7 +63,11 @@ public List getShardAllCheckerGroupHealthCheck(Str if (activeDc.equalsIgnoreCase(currentDc)) { return getLocalDcShardAllCheckerGroupHealthCheck(dcId, clusterId, shardId); } - return consoleManager.getShardAllCheckerGroupHealthCheck(activeDc, dcId, clusterId, shardId); + if (metaCache.isCrossRegion(dcId, activeDc)) { + result.addAll(consoleManager.getShardAllCheckerGroupHealthCheck(dcId, dcId, clusterId, shardId)); + } + result.addAll(consoleManager.getShardAllCheckerGroupHealthCheck(activeDc, dcId, clusterId, shardId)); + return result; } @Override @@ -88,7 +93,7 @@ public List getLocalDcShardAllCheckerGroupHealthCh List result = new ArrayList<>(); for (Map.Entry> entry : checkerInstancesMap.entrySet()) { HostPort checker = entry.getKey(); - ShardCheckerHealthCheckModel shardCheckerHealthCheckModel = new ShardCheckerHealthCheckModel(checker.getHost(), checker.getPort()); + ShardCheckerHealthCheckModel shardCheckerHealthCheckModel = new ShardCheckerHealthCheckModel(checker.getHost(), checker.getPort(), currentDc); if (checker.equals(checkerLeader)) { shardCheckerHealthCheckModel.setCheckerRole(CheckerRole.LEADER); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/ConsoleServiceManager.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/ConsoleServiceManager.java index 5a1b877b6..512848b6f 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/ConsoleServiceManager.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/ConsoleServiceManager.java @@ -73,7 +73,7 @@ public List> allInstanceHealthStatus() { for(ConsoleService consoleService : consoleServiceMap.values()){ try{ - Map allInstanceHealthStatus = consoleService.getAllInstanceHealthStatus(); + Map allInstanceHealthStatus = consoleService.getAllInstanceHealthStatus(false); result.add(allInstanceHealthStatus); }catch (Exception e){ logger.error("[allInstanceHealthStatus] {}", consoleService, e); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java index dcab438be..86f2c7466 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java @@ -32,6 +32,8 @@ public class DefaultConsoleService extends AbstractService implements ConsoleSer private final String allHealthStatusUrl; + private final String allCrossRegionHealthStatusUrl; + private final String pingStatusUrl; private final String innerShardDelayStatusUrl; @@ -70,6 +72,7 @@ public DefaultConsoleService(String address){ } healthStatusUrl = String.format("%s/api/health/{ip}/{port}", this.address); allHealthStatusUrl = String.format("%s/api/health/check/status/all", this.address); + allCrossRegionHealthStatusUrl = String.format("%s/api/health/cross/region/check/status/all", this.address); pingStatusUrl = String.format("%s/api/redis/ping/{ip}/{port}", this.address); innerShardDelayStatusUrl = String.format("%s/api/shard/inner/delay/{shardId}", this.address); innerDelayStatusUrl = String.format("%s/api/redis/inner/delay/{ip}/{port}", this.address); @@ -90,7 +93,10 @@ public HEALTH_STATE getInstanceStatus(String ip, int port) { } @Override - public Map getAllInstanceHealthStatus() { + public Map getAllInstanceHealthStatus(boolean isCrossRegion) { + if (isCrossRegion) { + return restTemplate.getForObject(allCrossRegionHealthStatusUrl, AllInstanceHealthStatus.class); + } return restTemplate.getForObject(allHealthStatusUrl, AllInstanceHealthStatus.class); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java index b469b4ff4..d0968b513 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java @@ -207,8 +207,8 @@ public void recordAlert(@RequestBody CheckerConsoleService.AlertMessage alertMes } @GetMapping(value = ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ACTIVE_DC_ONE_WAY_CLUSTERS) - public Map loadAllOuterClientClusters(@RequestParam String activeDc) { - return outerClientCache.getAllActiveDcClusters(activeDc); + public Map loadAllOuterClientClusters(@RequestParam String dc) { + return outerClientCache.getAllDcClusters(dc); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java index 5f0cee57a..356e30dc0 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java @@ -7,6 +7,7 @@ public class ShardCheckerHealthCheckModel { + private String idc; private String host; private int port; private CheckerRole checkerRole; @@ -15,13 +16,22 @@ public class ShardCheckerHealthCheckModel { public ShardCheckerHealthCheckModel() { } - public ShardCheckerHealthCheckModel(String host, int port) { + public ShardCheckerHealthCheckModel(String host, int port, String idc) { this.host = host; this.port = port; + this.idc = idc; this.checkerRole = CheckerRole.FOLLOWER; this.instances = new ArrayList<>(); } + public String getIdc() { + return idc; + } + + public void setIdc(String idc) { + this.idc = idc; + } + public String getHost() { return host; } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java index 01fbaba70..1ce74bb17 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java @@ -34,9 +34,9 @@ public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws } @Override - public Map getAllActiveDcClusters(String activeDc) { + public Map getAllDcClusters(String dc) { try { - return service.loadAllActiveDcOneWayClusterInfo(config.getConsoleAddress(), activeDc); + return service.loadAllDcOneWayClusterInfo(config.getConsoleAddress(), dc); } catch (RestClientException e) { logger.warn("[getAllOneWayClusters] rest fail, {}", e.getMessage()); } catch (Throwable th) { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java index 43f0eab79..254e218aa 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java @@ -12,11 +12,13 @@ import org.springframework.stereotype.Component; import org.springframework.web.client.RestClientException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import com.ctrip.xpipe.api.migration.OuterClientService.*; /** * @author lishanglin @@ -30,7 +32,7 @@ public class DefaultOuterClientCache extends AbstractLifecycle implements OuterC private ConsoleConfig config; - private TimeBoundCache> clustersCache; + private TimeBoundCache> clustersCache; private ScheduledExecutorService scheduled; @@ -40,12 +42,12 @@ public DefaultOuterClientCache(ConsoleConfig config) { this.outerClientService = OuterClientService.DEFAULT; this.config = config; this.clustersCache = new TimeBoundCache<>(() -> 10000 + config.getRedisConfCheckIntervalMilli(), - () -> loadActiveDcClusters(FoundationService.DEFAULT.getDataCenter())); + () -> loadClusters(FoundationService.DEFAULT.getDataCenter())); } @Override - public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws Exception { - OuterClientService.ClusterInfo clusterInfo = clustersCache.getData(false).get(clusterName.toLowerCase()); + public ClusterInfo getClusterInfo(String clusterName) throws Exception { + ClusterInfo clusterInfo = clustersCache.getData(false).get(clusterName.toLowerCase()); if (null == clusterInfo) { return outerClientService.getClusterInfo(clusterName); } @@ -54,18 +56,26 @@ public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws } @Override - public Map getAllActiveDcClusters(String activeDc) { - if (FoundationService.DEFAULT.getDataCenter().equalsIgnoreCase(activeDc)) return clustersCache.getData(false); - else return loadActiveDcClusters(activeDc); + public Map getAllDcClusters(String dc) { + if (FoundationService.DEFAULT.getDataCenter().equalsIgnoreCase(dc)) return clustersCache.getData(false); + else return loadClusters(dc); } - private Map loadActiveDcClusters(String activeDc) { - Map clusters = new HashMap<>(); + private Map loadClusters(String dc) { + Map clusters = new HashMap<>(); try { - List clusterInfos = outerClientService.getActiveDcClusters(activeDc); - for (OuterClientService.ClusterInfo cluster: clusterInfos) { + List clusterInfos = outerClientService.getActiveDcClusters(dc); + for (ClusterInfo cluster: clusterInfos) { clusters.put(cluster.getName().toLowerCase(), cluster); } + DcMeta currentDcClusterInfos = outerClientService.getOutClientDcMeta(dc); + for (ClusterMeta clusterMeta: currentDcClusterInfos.getClusters().values()) { + if (!ClusterType.XPIPE_ONE_WAY.equals(clusterMeta.getClusterType())) continue; + ClusterInfo clusterInfo = buildClusterModel(clusterMeta); + if (!clusters.containsKey(clusterInfo.getName().toLowerCase())) { + clusters.put(clusterInfo.getName().toLowerCase(), clusterInfo); + } + } } catch (RestClientException e) { logger.warn("[refresh] rest fail, {}", e.getMessage()); } catch (Throwable th) { @@ -75,6 +85,49 @@ private Map loadActiveDcClusters(String return clusters; } + public ClusterInfo buildClusterModel(ClusterMeta clusterMeta) { + ClusterInfo cluster = clusterMetaToClusterModel(clusterMeta); + List groupModels = new ArrayList<>(); + for (GroupMeta group : clusterMeta.getGroups().values()) { + List instanceModels = new ArrayList<>(); + GroupInfo groupModel = groupMetaToGroupModel(group); + for (RedisMeta instance : group.getRedises()) { + instanceModels.add(redisMetaToInstanceModel(instance)); + } + groupModel.setInstances(instanceModels); + groupModels.add(groupModel); + } + cluster.setGroups(groupModels); + return cluster; + } + + public ClusterInfo clusterMetaToClusterModel(ClusterMeta clusterMeta) { + ClusterInfo cluster = new ClusterInfo(); + cluster.setName(clusterMeta.getName()); + if (clusterMeta.getClusterType().equals(ClusterType.XPIPE_ONE_WAY)) { + cluster.setIsXpipe(true); + cluster.setMasterIDC(clusterMeta.getActiveIDC()); + } + return cluster; + } + + public GroupInfo groupMetaToGroupModel(GroupMeta groupMeta) { + GroupInfo groupModel = new GroupInfo(); + groupModel.setName(groupMeta.getGroupName()); + return groupModel; + } + + public InstanceInfo redisMetaToInstanceModel(RedisMeta redisMeta) { + InstanceInfo instance = new InstanceInfo(); + instance.setIPAddress(redisMeta.getHost()); + instance.setPort(redisMeta.getPort()); + instance.setIsMaster(redisMeta.isMaster()); + instance.setStatus(InstanceStatus.ACTIVE.equals(redisMeta.getStatus())); + instance.setEnv(redisMeta.getIdc()); + instance.setCanRead(!InstanceStatus.INACTIVE.equals(redisMeta.getStatus())); + return instance; + } + @Override protected void doInitialize() throws Exception { super.doInitialize(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java index 595743375..cf3b112fa 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java @@ -42,6 +42,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; +import java.util.List; import java.util.concurrent.ExecutorService; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; @@ -189,9 +190,9 @@ public SentinelManager sentinelManager() { public HealthCheckReporter healthCheckReporter(CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, GroupCheckerLeaderElector clusterServer, AllCheckerLeaderElector allCheckerLeaderElector, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, - ClusterHealthManager clusterHealthManager, HealthStateService healthStateService, + ClusterHealthManager clusterHealthManager, List healthStateServices, @Value("${server.port}") int serverPort) { - return new HealthCheckReporter(healthStateService, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, + return new HealthCheckReporter(healthStateServices, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, crossMasterDelayManager, pingService, clusterHealthManager, serverPort); } diff --git a/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html b/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html index 960baf970..95ab51581 100644 --- a/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html +++ b/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html @@ -134,7 +134,7 @@ Checker Groups - Show Actions + Show Actions(err) 刷新 @@ -144,7 +144,8 @@

- {{checker.host}}:{{checker.port}} + {{checker.idc}} + {{checker.host}}:{{checker.port}} {{checker.checkerRole}}
diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java index 8d427e185..823b433ee 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java @@ -204,8 +204,12 @@ protected void handleMessage(Object response) { if(!(response instanceof Object[])) { throw new RedisRuntimeException(String.format("Subscribe subscribeChannel response incorrect: %s", response)); } - - SubscribeMessageHandler handler = getSubscribeMessageHandler(); + SubscribeMessageHandler handler; + if (this.getName().equals(PSUBSCRIBE)) { + handler = getPSubscribeMessageHandler(); + } else { + handler = getSubscribeMessageHandler(); + } Pair channelAndMessage = handler.handle(payloadToStringArray(response)); if(channelAndMessage != null) { @@ -217,6 +221,10 @@ protected SubscribeMessageHandler getSubscribeMessageHandler() { return new DefaultSubscribeMessageHandler(); } + protected SubscribeMessageHandler getPSubscribeMessageHandler() { + return new PsubscribeMessageHandler(); + } + private void notifyListeners(Pair channelAndMessage) { for(SubscribeListener listener : listeners) { try { diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java new file mode 100644 index 000000000..f47a7e705 --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java @@ -0,0 +1,33 @@ +package com.ctrip.xpipe.redis.core.protocal.cmd.pubsub; + +import com.ctrip.xpipe.api.pool.SimpleObjectPool; +import com.ctrip.xpipe.netty.commands.NettyClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + +public class PsubscribeCommand extends AbstractSubscribe{ + + private static final Logger logger = LoggerFactory.getLogger(PsubscribeCommand.class); + + protected PsubscribeCommand(String host, int port, ScheduledExecutorService scheduled, MESSAGE_TYPE messageType, String... subscribeChannel) { + super(host, port, scheduled, messageType, subscribeChannel); + } + + public PsubscribeCommand(SimpleObjectPool clientPool, ScheduledExecutorService scheduled, int commandTimeoutMilli, String... channel) { + super(clientPool, scheduled, commandTimeoutMilli, MESSAGE_TYPE.PMESSAGE, channel); + } + + + @Override + public String getName() { + return PSUBSCRIBE; + } + + @Override + protected Logger getLogger() { + return logger; + } + +} diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java index e2109c834..b4fcda175 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java @@ -38,7 +38,7 @@ public SubscribeCommand(SimpleObjectPool clientPool, ScheduledExecu @Override public String getName() { - return "subscribe"; + return SUBSCRIBE; } @Override diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java index 6ef5ff7f6..aa984d3f4 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java @@ -44,6 +44,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; +import java.util.List; import java.util.concurrent.ExecutorService; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; @@ -184,9 +185,9 @@ public SentinelManager sentinelManager() { public HealthCheckReporter healthCheckReporter(CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, GroupCheckerLeaderElector clusterServer, AllCheckerLeaderElector allCheckerLeaderElector, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, - ClusterHealthManager clusterHealthManager, HealthStateService healthStateService, + ClusterHealthManager clusterHealthManager, List healthStateServices, @Value("${server.port}") int serverPort) { - return new HealthCheckReporter(healthStateService, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, + return new HealthCheckReporter(healthStateServices, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, crossMasterDelayManager, pingService, clusterHealthManager, serverPort); }