diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java index 795083a41..467824242 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerConsoleService.java @@ -57,9 +57,7 @@ public interface CheckerConsoleService { Map loadAllClusterCreateTime(String console); - Map loadAllDcOneWayClusterInfo(String console, String dc); - - Map loadCurrentDcOneWayClusterInfo(String console, String dc); + Map loadAllActiveDcOneWayClusterInfo(String console, String activeDc); void bindShardSentinel(String console, String dc, String cluster, String shard, SentinelMeta sentinelMeta); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java index ee095e5cb..5268f40fd 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/CheckerService.java @@ -17,10 +17,6 @@ class AllInstanceHealthStatus extends HashMap {} HEALTH_STATE getInstanceStatus(String ip, int port); - HEALTH_STATE getCrossRegionInstanceStatus(String ip, int port); - Map getAllInstanceHealthStatus(); - Map getAllInstanceCrossRegionHealthStatus(); - } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java index b38101fa9..b55baa24a 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/OuterClientCache.java @@ -12,8 +12,6 @@ public interface OuterClientCache { OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws Exception; - Map getAllDcClusters(String dc); - - Map getAllCurrentDcClusters(String dc); + Map getAllActiveDcClusters(String activeDc); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java index c1855fb7f..2e4593862 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java @@ -6,7 +6,6 @@ import com.ctrip.xpipe.redis.checker.controller.result.ActionContextRetMessage; import com.ctrip.xpipe.redis.checker.healthcheck.*; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultPsubPingActionCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector; @@ -37,9 +36,6 @@ public class CheckerHealthController { @Autowired private DefaultDelayPingActionCollector defaultDelayPingActionCollector; - @Autowired - private DefaultPsubPingActionCollector defaultPsubPingActionCollector; - @Autowired private RedisUsedMemoryCollector redisUsedMemoryCollector; @@ -61,12 +57,6 @@ public HEALTH_STATE getHealthState(@PathVariable String ip, @PathVariable int po else return HEALTH_STATE.UNKNOWN; } - @RequestMapping(value = "/health/cross/region/{ip}/{port}", method = RequestMethod.GET) - public HEALTH_STATE getCrossRegionHealthState(@PathVariable String ip, @PathVariable int port) { - if (siteStability.isSiteStable()) return defaultPsubPingActionCollector.getHealthState(new HostPort(ip, port)); - else return HEALTH_STATE.UNKNOWN; - } - @RequestMapping(value = "/health/check/instance/{ip}/{port}", method = RequestMethod.GET) public String getHealthCheckInstance(@PathVariable String ip, @PathVariable int port) { RedisHealthCheckInstance instance = instanceManager.findRedisHealthCheckInstance(new HostPort(ip, port)); @@ -77,16 +67,6 @@ public String getHealthCheckInstance(@PathVariable String ip, @PathVariable int return Codec.DEFAULT.encode(model); } - @RequestMapping(value = "/health/check/cross/region//instance/{ip}/{port}", method = RequestMethod.GET) - public String getCrossRegionHealthCheckInstance(@PathVariable String ip, @PathVariable int port) { - RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForPsubPingAction(new HostPort(ip, port)); - if(instance == null) { - return "Not found"; - } - HealthCheckInstanceModel model = buildHealthCheckInfo(instance); - return Codec.DEFAULT.encode(model); - } - @RequestMapping(value = "/health/check/cluster/{clusterId}", method = RequestMethod.GET) public String getClusterHealthCheckInstance(@PathVariable String clusterId) { ClusterHealthCheckInstance instance = instanceManager.findClusterHealthCheckInstance(clusterId); @@ -117,16 +97,6 @@ public String getHealthCheckRedisInstanceForAssignedAction(@PathVariable String return Codec.DEFAULT.encode(model); } - @RequestMapping(value = "/health/check/redis-for-ping-action/{ip}/{port}", method = RequestMethod.GET) - public String getHealthCheckRedisInstanceForPingAction(@PathVariable String ip, @PathVariable int port) { - RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForPsubPingAction(new HostPort(ip, port)); - if(instance == null) { - return "Not found"; - } - HealthCheckInstanceModel model = buildHealthCheckInfo(instance); - return Codec.DEFAULT.encode(model); - } - @RequestMapping(value = "/health/redis/info/{ip}/{port}", method = RequestMethod.GET) public ActionContextRetMessage> getRedisInfo(@PathVariable String ip, @PathVariable int port) { return ActionContextRetMessage.from(redisInfoManager.getInfoByHostPort(new HostPort(ip, port))); @@ -143,12 +113,6 @@ public Map getAllHealthStatusDesc() { else return Collections.emptyMap(); } - @GetMapping("/health/check/cross/region/status/all") - public Map getAllCrossRegionHealthStatusDesc() { - if (siteStability.isSiteStable()) return defaultPsubPingActionCollector.getAllHealthStatus(); - else return Collections.emptyMap(); - } - @GetMapping("/health/keeper/status/all") public ConcurrentMap> getAllKeeperFlows() { return keeperFlowCollector.getHostPort2InputFlow(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java index e107c75c1..30f1e0aa8 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java @@ -18,8 +18,6 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisMeta redis); - RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis); - KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper); ClusterHealthCheckInstance getOrCreate(ClusterMeta cluster); @@ -28,8 +26,6 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort); - RedisHealthCheckInstance findRedisInstanceForPsubPingAction(HostPort hostPort); - KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort); ClusterHealthCheckInstance findClusterHealthCheckInstance(String clusterId); @@ -40,8 +36,6 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort); - RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort); - ClusterHealthCheckInstance remove(String cluster); List getAllRedisInstance(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java deleted file mode 100644 index 33ba4ea6f..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; - -import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; -import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionContext; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionContext; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionListener; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector; -import com.google.common.collect.Maps; - -import java.util.Map; - -public abstract class AbstractPsubPingActionCollector implements PsubPingActionCollector { - - protected Map allHealthStatus = Maps.newConcurrentMap(); - - protected PingActionListener pingActionListener = new AbstractPsubPingActionCollector.CollectorPingActionListener(); - - protected PsubActionListener psubActionListener = new AbstractPsubPingActionCollector.CollectorPsubActionListener(); - - protected abstract HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance); - - protected void removeHealthStatus(HealthCheckAction action) { - HealthStatus healthStatus = allHealthStatus.remove(action.getActionInstance()); - if(healthStatus != null) { - healthStatus.stop(); - } - } - - @Override - public boolean supportInstance(RedisHealthCheckInstance instance) { - return true; - } - - @Override - public PingActionListener createPingActionListener() { - return pingActionListener; - } - - @Override - public PsubActionListener createPsubActionListener() { - return psubActionListener; - } - - protected class CollectorPingActionListener implements PingActionListener { - - @Override - public void onAction(PingActionContext pingActionContext) { - HealthStatus healthStatus = createOrGetHealthStatus(pingActionContext.instance()); - if (!pingActionContext.isSuccess()) { - if (pingActionContext.getCause().getMessage().contains("LOADING")) { - healthStatus.loading(); - } - return; - } - - if (pingActionContext.getResult()) { - healthStatus.pong(); - } else { - if(healthStatus.getState() == HEALTH_STATE.UNKNOWN) { - healthStatus.pongInit(); - } - } - } - - @Override - public boolean worksfor(ActionContext t) { - return t instanceof PingActionContext; - } - - @Override - public void stopWatch(HealthCheckAction action) { - removeHealthStatus(action); - } - } - - protected class CollectorPsubActionListener implements PsubActionListener { - - @Override - public void onAction(PsubActionContext psubActionContext) { - HealthStatus healthStatus = createOrGetHealthStatus(psubActionContext.instance()); - if (!psubActionContext.getResult().isEmpty()) { - healthStatus.subSuccess(); - } - } - - @Override - public void stopWatch(HealthCheckAction action) { - removeHealthStatus(action); - } - } -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java deleted file mode 100644 index ece7dd5ff..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; - -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceDown; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceLoading; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceUp; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ScheduledExecutorService; - -/** - * UNKNOWN - * pingSuccess -> INSTANCEUP + start subAction - * pingFail -> DOWN + markDown - * subSuccess -> throw exception - *

- * INSTANCEUP - * pingSuccess,do nothing - * pingFail -> DOWN + markDown - * subSuccess -> HEALTHY + markUp + stop subAction - *

- * HEALTHY - * pingSuccess,do nothing - * pingFail -> DOWN + markDown - * subSuccess -> throw exception - *

- * DOWN - * pingSuccess -> INSTANCEUP + start subAction - * pingFail,do nothing - * subSuccess -> throw exception - */ -public class CrossRegionRedisHealthStatus extends HealthStatus { - - protected static final Logger logger = LoggerFactory.getLogger(CrossRegionRedisHealthStatus.class); - - public CrossRegionRedisHealthStatus(RedisHealthCheckInstance instance, ScheduledExecutorService scheduled) { - super(instance, scheduled); - } - - @Override - protected void loading() { - HEALTH_STATE preState = state.get(); - if(state.compareAndSet(preState, HEALTH_STATE.DOWN)) { - logStateChange(preState, state.get()); - } - if (!preState.equals(HEALTH_STATE.DOWN)) { - logger.info("[setLoading] {}", this); - notifyObservers(new InstanceLoading(instance)); - } - } - - @Override - protected void pong() { - lastPongTime.set(System.currentTimeMillis()); - HEALTH_STATE preState = state.get(); - if (preState.equals(HEALTH_STATE.UNKNOWN) || preState.equals(HEALTH_STATE.DOWN)) { - if(state.compareAndSet(preState, HEALTH_STATE.INSTANCEUP)) { - logStateChange(preState, state.get()); - } - } - } - - @Override - protected void subSuccess() { - HEALTH_STATE preState = state.get(); - if (preState.equals(HEALTH_STATE.INSTANCEUP)) { - if(state.compareAndSet(preState, HEALTH_STATE.HEALTHY)) { - logStateChange(preState, state.get()); - } - logger.info("[setUp] {}", this); - notifyObservers(new InstanceUp(instance)); - } - } - - @Override - protected void healthStatusUpdate() { - long currentTime = System.currentTimeMillis(); - - if(lastPongTime.get() != UNSET_TIME) { - long pingDownTime = currentTime - lastPongTime.get(); - final int pingDownAfter = pingDownAfterMilli.getAsInt(); - if (pingDownTime > pingDownAfter) { - doMarkDown(); - } - } - } - - protected void doMarkDown() { - HEALTH_STATE preState = state.get(); - if(state.compareAndSet(preState, HEALTH_STATE.DOWN)) { - logStateChange(preState, state.get()); - } - if (!preState.equals(HEALTH_STATE.DOWN)) { - logger.info("[setDown] {}", this); - notifyObservers(new InstanceDown(instance)); - } - } - -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java deleted file mode 100644 index ca6307946..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; - -import com.ctrip.xpipe.api.factory.ObjectFactory; -import com.ctrip.xpipe.api.observer.Observable; -import com.ctrip.xpipe.api.observer.Observer; -import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; -import com.ctrip.xpipe.endpoint.HostPort; -import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.AbstractInstanceEvent; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.HealthEventProcessor; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector; -import com.ctrip.xpipe.utils.MapUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; - -import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; -import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; - -@Component -public class DefaultPsubPingActionCollector extends AbstractPsubPingActionCollector implements PsubPingActionCollector, HealthStateService, OneWaySupport { - - private static final Logger logger = LoggerFactory.getLogger(DefaultPsubPingActionCollector.class); - - @Autowired - private List healthEventProcessors; - - @Resource(name = SCHEDULED_EXECUTOR) - private ScheduledExecutorService scheduled; - - @Resource(name = GLOBAL_EXECUTOR) - private ExecutorService executors; - - @Override - public HEALTH_STATE getHealthState(HostPort hostPort) { - RedisHealthCheckInstance key = allHealthStatus.keySet().stream() - .filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort)) - .findFirst().orElse(null); - - if (null != key) return allHealthStatus.get(key).getState(); - return HEALTH_STATE.UNKNOWN; - } - - @Override - public Map getAllCachedState() { - Map cachedHealthStatus = new HashMap<>(); - allHealthStatus.forEach(((instance, healthStatus) -> { - RedisInstanceInfo info = instance.getCheckInfo(); - cachedHealthStatus.put(info.getHostPort(), healthStatus.getState()); - })); - - return cachedHealthStatus; - } - - public Map getAllHealthStatus() { - Map cachedHealthStatus = new HashMap<>(); - allHealthStatus.forEach(((instance, healthStatus) -> { - HostPort hostPort = instance.getCheckInfo().getHostPort(); - cachedHealthStatus.put(hostPort, new HealthStatusDesc(hostPort, healthStatus)); - })); - - return cachedHealthStatus; - } - - @Override - public void updateHealthState(Map redisStates) { - throw new UnsupportedOperationException(); - } - - @Override - protected HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance) { - return MapUtils.getOrCreate(allHealthStatus, instance, new ObjectFactory() { - @Override - public HealthStatus create() { - - HealthStatus healthStatus = new CrossRegionRedisHealthStatus(instance, scheduled); - - healthStatus.addObserver(new Observer() { - @Override - public void update(Object args, Observable observable) { - onInstanceStateChange(args); - } - }); - healthStatus.start(); - return healthStatus; - } - }); - } - - private void onInstanceStateChange(Object args) { - - logger.info("[onInstanceStateChange]{}", args); - for (HealthEventProcessor processor : healthEventProcessors) { - - if (processor instanceof OneWaySupport) { - executors.execute(new AbstractExceptionLogTask() { - @Override - protected void doRun() throws Exception { - processor.onEvent((AbstractInstanceEvent) args); - } - }); - } - } - } - -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java index be0421bfa..7ad76d41b 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java @@ -31,10 +31,10 @@ public class HealthStatus extends AbstractObservable implements Startable, Stopp public static long UNSET_TIME = -1L; - protected AtomicLong lastPongTime = new AtomicLong(UNSET_TIME); + private AtomicLong lastPongTime = new AtomicLong(UNSET_TIME); private AtomicLong lastHealthDelayTime = new AtomicLong(UNSET_TIME); - protected AtomicReference state = new AtomicReference<>(HEALTH_STATE.UNKNOWN); + private AtomicReference state = new AtomicReference<>(HEALTH_STATE.UNKNOWN); protected RedisHealthCheckInstance instance; protected final IntSupplier delayDownAfterMilli; @@ -110,7 +110,7 @@ protected boolean shouldNotRun() { return lastHealthDelayTime.get() < 0 && lastPongTime.get() < 0; } - protected void loading() { + void loading() { HEALTH_STATE preState = state.get(); if(preState.equals(preState.afterPingFail())) { return; @@ -124,7 +124,7 @@ protected void loading() { } } - protected void pong(){ + void pong(){ lastPongTime.set(System.currentTimeMillis()); setPingUp(); } @@ -135,8 +135,6 @@ void pongInit() { } } - protected void subSuccess(){} - void delay(long delayMilli, long...srcShardDbId){ //first time diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java deleted file mode 100644 index bf78a0415..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCrossRegionHealthStatusConsistenceInspector.java +++ /dev/null @@ -1,202 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator; - -import com.ctrip.xpipe.api.foundation.FoundationService; -import com.ctrip.xpipe.api.lifecycle.Ordered; -import com.ctrip.xpipe.api.lifecycle.TopElement; -import com.ctrip.xpipe.api.monitor.Task; -import com.ctrip.xpipe.api.monitor.TransactionMonitor; -import com.ctrip.xpipe.cluster.ClusterType; -import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; -import com.ctrip.xpipe.endpoint.HostPort; -import com.ctrip.xpipe.lifecycle.AbstractLifecycle; -import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; -import com.ctrip.xpipe.redis.checker.config.CheckerConfig; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.OutClientInstanceHealthHolder; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.UpDownInstances; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; -import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder; -import com.ctrip.xpipe.redis.core.entity.ClusterMeta; -import com.ctrip.xpipe.redis.core.entity.DcMeta; -import com.ctrip.xpipe.redis.core.entity.ShardMeta; -import com.ctrip.xpipe.redis.core.entity.XpipeMeta; -import com.ctrip.xpipe.redis.core.exception.MasterNotFoundException; -import com.ctrip.xpipe.redis.core.meta.MetaCache; -import com.ctrip.xpipe.tuple.Pair; -import com.ctrip.xpipe.utils.MapUtils; -import com.ctrip.xpipe.utils.XpipeThreadFactory; -import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.lang.Nullable; -import org.springframework.stereotype.Service; - -import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; - -@Service -public class InstanceCrossRegionHealthStatusConsistenceInspector extends AbstractLifecycle implements TopElement { - private InstanceHealthStatusCollector collector; - - private InstanceStatusAdjuster adjuster; - - private StabilityHolder siteStability; - - private CheckerConfig config; - - private MetaCache metaCache; - - private GroupCheckerLeaderElector leaderElector; - - private DynamicDelayPeriodTask task; - - private ScheduledExecutorService scheduled; - - private static final Logger logger = LoggerFactory.getLogger(InstanceCrossRegionHealthStatusConsistenceInspector.class); - - private static final String currentDc = FoundationService.DEFAULT.getDataCenter(); - - private static final String TYPE = "HealthCheck"; - - @Autowired - public InstanceCrossRegionHealthStatusConsistenceInspector(InstanceHealthStatusCollector instanceHealthStatusCollector, - InstanceStatusAdjuster instanceStatusAdjuster, - @Nullable GroupCheckerLeaderElector groupCheckerLeaderElector, - StabilityHolder stabilityHolder, CheckerConfig checkerConfig, - MetaCache metaCache) { - this.collector = instanceHealthStatusCollector; - this.adjuster = instanceStatusAdjuster; - this.leaderElector = groupCheckerLeaderElector; - this.siteStability = stabilityHolder; - this.config = checkerConfig; - this.metaCache = metaCache; - } - - protected void inspectCurrentDc() { - if(!siteStability.isSiteStable()) { - logger.debug("[inspectCrossRegion][skip] unstable"); - return; - } - if (null == leaderElector || !leaderElector.amILeader()) { - logger.debug("[inspectCrossRegion][skip] not leader"); - return; - } - - logger.debug("[inspectCrossRegion] begin"); - final long timeoutMill = System.currentTimeMillis() + config.getPingDownAfterMilli() / 2; - TransactionMonitor.DEFAULT.logTransactionSwallowException(TYPE, "compensator.inspect.crossRegion", new Task() { - @Override - public void go() throws Exception { - Map> interestedCurrentDc = fetchInterestedCurrentDcClusterInstances(); - if (interestedCurrentDc.isEmpty()) { - logger.debug("[inspectCrossRegion][skip] no interested instance"); - return; - } - - Pair instanceHealth = collector.collect(true); - checkTimeout(timeoutMill, "after collect"); - - XPipeInstanceHealthHolder xpipeInstanceHealth = instanceHealth.getKey(); - OutClientInstanceHealthHolder outClientInstanceHealth = instanceHealth.getValue(); - UpDownInstances hostPortNeedAdjustForPingAction = findHostPortNeedAdjust(xpipeInstanceHealth, outClientInstanceHealth, interestedCurrentDc); - - checkTimeout(timeoutMill, "after compare"); - if (!hostPortNeedAdjustForPingAction.getHealthyInstances().isEmpty()) - adjuster.adjustInstances(hostPortNeedAdjustForPingAction.getHealthyInstances(), true, true, timeoutMill); - - checkTimeout(timeoutMill, "after adjust up"); - if (!hostPortNeedAdjustForPingAction.getUnhealthyInstances().isEmpty()) - adjuster.adjustInstances(hostPortNeedAdjustForPingAction.getUnhealthyInstances(), true, false, timeoutMill); - } - - @Override - public Map getData() { - return Collections.singletonMap("timeoutMilli", timeoutMill); - } - }); - } - - private void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { - if (System.currentTimeMillis() > timeoutAtMilli) { - logger.info("[timeout] {}", msg); - throw new TimeoutException(msg); - } - } - - protected Map> fetchInterestedCurrentDcClusterInstances() { - Map> interestedCurrentDcClusterInstances = new HashMap<>(); - XpipeMeta xpipeMeta = metaCache.getXpipeMeta(); - if (null == xpipeMeta) return interestedCurrentDcClusterInstances; - - for (DcMeta dcMeta: xpipeMeta.getDcs().values()) { - if (!dcMeta.getId().equalsIgnoreCase(currentDc)) continue; - - for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { - if (!ClusterType.isSameClusterType(clusterMeta.getType(), ClusterType.ONE_WAY)) continue; - if (!metaCache.isCrossRegion(dcMeta.getId(), clusterMeta.getActiveDc())) continue; - - Set interestedInstances = MapUtils.getOrCreate(interestedCurrentDcClusterInstances, clusterMeta.getId(), HashSet::new); - for (ShardMeta shardMeta: clusterMeta.getShards().values()) { - shardMeta.getRedises().forEach(redis -> interestedInstances.add(new HostPort(redis.getIp(), redis.getPort()))); - } - } - } - - return interestedCurrentDcClusterInstances; - } - - protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipeInstanceHealthHolder, - OutClientInstanceHealthHolder outClientInstanceHealthHolder, - Map> interested) { - int quorum = config.getQuorum(); - UpDownInstances xpipeInstances = xpipeInstanceHealthHolder.aggregate(interested, quorum); - UpDownInstances outClientInstances = outClientInstanceHealthHolder.extractReadable(interested); - - Set needMarkUpInstances = xpipeInstances.getHealthyInstances(); - Set needMarkDownInstances = xpipeInstances.getUnhealthyInstances(); - needMarkUpInstances.retainAll(outClientInstances.getUnhealthyInstances()); - needMarkDownInstances.retainAll(outClientInstances.getHealthyInstances()); - logger.info("[InstanceCrossRegionHealthStatusConsistenceInspector] needMarkUpInstances:{}", needMarkUpInstances); - logger.info("[InstanceCrossRegionHealthStatusConsistenceInspector] needMarkDownInstances:{}", needMarkDownInstances); - return new UpDownInstances(needMarkUpInstances, needMarkDownInstances); - } - - @Override - protected void doInitialize() throws Exception { - super.doInitialize(); - this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("InstanceCrossRegionHealthStatusConsistenceInspector")); - this.task = new DynamicDelayPeriodTask("inspectCrossRegion", new AbstractExceptionLogTask() { - @Override - protected void doRun() throws Exception { - inspectCurrentDc(); - } - }, config::getHealthMarkCompensateIntervalMill, scheduled); - } - - @Override - protected void doStart() throws Exception { - super.doStart(); - this.task.start(); - } - - @Override - protected void doStop() throws Exception { - super.doStop(); - this.task.stop(); - } - - @Override - protected void doDispose() throws Exception { - super.doDispose(); - this.scheduled.shutdown(); - this.scheduled = null; - this.task = null; - } - - @Override - public int getOrder() { - return Ordered.LOWEST_PRECEDENCE; - } -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java index 058e87aba..b401c881a 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollector.java @@ -7,7 +7,6 @@ import com.ctrip.xpipe.redis.checker.CheckerService; import com.ctrip.xpipe.redis.checker.OuterClientCache; import com.ctrip.xpipe.redis.checker.RemoteCheckerManager; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.OutClientInstanceHealthHolder; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; @@ -49,31 +48,27 @@ public InstanceHealthStatusCollector(RemoteCheckerManager remoteCheckerManager, this.executors = executors; } - public Pair collect() throws ExecutionException, InterruptedException, TimeoutException { - return collect(false); - } - - public Pair collect(boolean isCrossRegion) + public Pair collect() throws InterruptedException, ExecutionException, TimeoutException { XPipeInstanceHealthHolder xpipeInstanceHealthHolder = new XPipeInstanceHealthHolder(); OutClientInstanceHealthHolder outClientInstanceHealthHolder = new OutClientInstanceHealthHolder(); ParallelCommandChain commandChain = new ParallelCommandChain(executors); - commandChain.add(new GetAllOutClientInstanceStatusCmd(outClientInstanceHealthHolder, isCrossRegion)); + commandChain.add(new GetAllOutClientInstanceStatusCmd(outClientInstanceHealthHolder)); remoteCheckerManager.getAllCheckerServices().forEach(checkerService -> { - commandChain.add(new GetRemoteCheckResultCmd(checkerService, xpipeInstanceHealthHolder, isCrossRegion)); + commandChain.add(new GetRemoteCheckResultCmd(checkerService, xpipeInstanceHealthHolder)); }); commandChain.execute().get(5, TimeUnit.SECONDS); return new Pair<>(xpipeInstanceHealthHolder, outClientInstanceHealthHolder); } - public XPipeInstanceHealthHolder collectXPipeInstanceHealth(HostPort hostPort, boolean isCrossRegion) + public XPipeInstanceHealthHolder collectXPipeInstanceHealth(HostPort hostPort) throws InterruptedException, ExecutionException, TimeoutException { ParallelCommandChain commandChain = new ParallelCommandChain(executors); XPipeInstanceHealthHolder xpipeInstanceHealthHolder = new XPipeInstanceHealthHolder(); remoteCheckerManager.getAllCheckerServices().forEach(checkerService -> { - commandChain.add(new GetRemoteHealthStateCmd(hostPort, checkerService, xpipeInstanceHealthHolder, isCrossRegion)); + commandChain.add(new GetRemoteHealthStateCmd(hostPort, checkerService, xpipeInstanceHealthHolder)); }); commandChain.execute().get(5, TimeUnit.SECONDS); @@ -86,22 +81,15 @@ private class GetRemoteCheckResultCmd extends AbstractCommand { private XPipeInstanceHealthHolder resultHolder; - private boolean isCrossRegion; - - public GetRemoteCheckResultCmd(CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder, boolean isCrossRegion) { + public GetRemoteCheckResultCmd(CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder) { this.checkerService = checkerService; this.resultHolder = xpipeInstanceHealthHolder; - this.isCrossRegion = isCrossRegion; } @Override protected void doExecute() throws Throwable { try { - if (isCrossRegion) { - resultHolder.add(checkerService.getAllInstanceCrossRegionHealthStatus()); - } else { - resultHolder.add(checkerService.getAllInstanceHealthStatus()); - } + resultHolder.add(checkerService.getAllInstanceHealthStatus()); } catch (RestClientException restException) { logger.info("[doExecute][rest fail] {}", restException.getMessage()); } catch (Throwable th) { @@ -126,23 +114,15 @@ private class GetAllOutClientInstanceStatusCmd extends AbstractCommand { private OutClientInstanceHealthHolder resultHolder; - private boolean isCrossRegion; - - public GetAllOutClientInstanceStatusCmd(OutClientInstanceHealthHolder outClientInstanceHealthHolder, boolean isCrossRegion) { + public GetAllOutClientInstanceStatusCmd(OutClientInstanceHealthHolder outClientInstanceHealthHolder) { this.resultHolder = outClientInstanceHealthHolder; - this.isCrossRegion = isCrossRegion; } @Override protected void doExecute() throws Throwable { try { - if (isCrossRegion) { - resultHolder.addClusters( - outerClientCache.getAllCurrentDcClusters(FoundationService.DEFAULT.getDataCenter())); - } else { - resultHolder.addClusters( - outerClientCache.getAllDcClusters(FoundationService.DEFAULT.getDataCenter())); - } + resultHolder.addClusters( + outerClientCache.getAllActiveDcClusters(FoundationService.DEFAULT.getDataCenter())); } catch (Throwable th) { logger.info("[doExecute][fail]", th); } @@ -169,25 +149,16 @@ private class GetRemoteHealthStateCmd extends AbstractCommand { private XPipeInstanceHealthHolder resultHolder; - private boolean isCrossRegion; - - public GetRemoteHealthStateCmd(HostPort hostPort, CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder, boolean isCrossRegion) { + public GetRemoteHealthStateCmd(HostPort hostPort, CheckerService checkerService, XPipeInstanceHealthHolder xpipeInstanceHealthHolder) { this.hostPort = hostPort; this.checkerService = checkerService; this.resultHolder = xpipeInstanceHealthHolder; - this.isCrossRegion = isCrossRegion; } @Override protected void doExecute() throws Throwable { try { - HEALTH_STATE status; - if (isCrossRegion) { - status = checkerService.getCrossRegionInstanceStatus(hostPort.getHost(), hostPort.getPort()); - } else { - status = checkerService.getInstanceStatus(hostPort.getHost(), hostPort.getPort()); - } - resultHolder.add(new HealthStatusDesc(hostPort, status)); + resultHolder.add(new HealthStatusDesc(hostPort, checkerService.getInstanceStatus(hostPort.getHost(), hostPort.getPort()))); } catch (RestClientException restException) { logger.info("[doExecute][rest fail] {}", restException.getMessage()); } catch (Throwable th) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java index 93c9a072b..1dfa71b06 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java @@ -49,17 +49,17 @@ @Service public class InstanceHealthStatusConsistenceInspector extends AbstractLifecycle implements TopElement { - protected InstanceHealthStatusCollector collector; + private InstanceHealthStatusCollector collector; - protected InstanceStatusAdjuster adjuster; + private InstanceStatusAdjuster adjuster; - protected StabilityHolder siteStability; + private StabilityHolder siteStability; - protected CheckerConfig config; + private CheckerConfig config; - protected MetaCache metaCache; + private MetaCache metaCache; - protected GroupCheckerLeaderElector leaderElector; + private GroupCheckerLeaderElector leaderElector; private DynamicDelayPeriodTask task; @@ -124,11 +124,11 @@ public void go() throws Exception { checkTimeout(timeoutMill, "after compare"); if (!instanceNeedAdjust.getHealthyInstances().isEmpty()) - adjuster.adjustInstances(instanceNeedAdjust.getHealthyInstances(), false, true, timeoutMill); + adjuster.adjustInstances(instanceNeedAdjust.getHealthyInstances(), true, timeoutMill); checkTimeout(timeoutMill, "after adjust up"); if (!instanceNeedAdjust.getUnhealthyInstances().isEmpty()) - adjuster.adjustInstances(instanceNeedAdjust.getUnhealthyInstances(), false, false, timeoutMill); + adjuster.adjustInstances(instanceNeedAdjust.getUnhealthyInstances(), false, timeoutMill); } @Override @@ -138,7 +138,7 @@ public Map getData() { }); } - protected void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { + private void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { if (System.currentTimeMillis() > timeoutAtMilli) { logger.info("[timeout] {}", msg); throw new TimeoutException(msg); @@ -157,7 +157,6 @@ protected Map> fetchInterestedClusterInstances() { for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { if (!ClusterType.isSameClusterType(clusterMeta.getType(), ClusterType.ONE_WAY)) continue; - if (metaCache.isCrossRegion(dcMeta.getId(), clusterMeta.getActiveDc())) continue;; if (!clusterMeta.getActiveDc().equalsIgnoreCase(currentDc)) continue; Set interestedInstances = MapUtils.getOrCreate(interestedClusterInstances, clusterMeta.getId(), HashSet::new); @@ -182,8 +181,7 @@ protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipe needMarkUpInstances.retainAll(outClientInstances.getUnhealthyInstances()); needMarkDownInstances.retainAll(outClientInstances.getHealthyInstances()); needMarkDownInstances = filterMasterHealthyInstances(xpipeInstanceHealthHolder, needMarkDownInstances, quorum); - logger.info("[InstanceHealthStatusConsistenceInspector] needMarkUpInstances:{}", needMarkUpInstances); - logger.info("[InstanceHealthStatusConsistenceInspector] needMarkDownInstances:{}", needMarkDownInstances); + needMarkDownInstances = filterMarkDowUnsupportedInstances(needMarkDownInstances); return new UpDownInstances(needMarkUpInstances, needMarkDownInstances); } @@ -214,6 +212,33 @@ protected Set filterMasterHealthyInstances(XPipeInstanceHealthHolder x return masterHealthyInstances; } + protected Set filterMarkDowUnsupportedInstances(Set instances) { + Set wontMarkdownInstances = new HashSet<>(); + for (HostPort instance : instances) { + HEALTH_STATE healthState = defaultDelayPingActionCollector.getState(instance); + if (healthState.equals(HEALTH_STATE.SICK)) { + if (!shouldMarkdownDcClusterSickInstances(healthCheckInstanceManager.findRedisHealthCheckInstance(instance))) + wontMarkdownInstances.add(instance); + } + } + instances.removeAll(wontMarkdownInstances); + return instances; + } + + boolean shouldMarkdownDcClusterSickInstances(RedisHealthCheckInstance healthCheckInstance) { + RedisInstanceInfo info = healthCheckInstance.getCheckInfo(); + if (info.isCrossRegion()) { + logger.info("[markdown][{} is cross region, do not call client service ]", info.getHostPort()); + return false; + } + if (healthCheckInstance.getHealthCheckConfig().getDelayConfig(info.getClusterId(), currentDc, info.getDcId()).getClusterLevelHealthyDelayMilli() < 0) { + logger.info("[markdown][cluster {} dcs {}->{} distance is -1, do not call client service ]", info.getClusterId(), currentDc, info.getDcId()); + return false; + } + return true; + } + + @Override protected void doInitialize() throws Exception { super.doInitialize(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommand.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommand.java index f064c2067..f8eef240b 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommand.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommand.java @@ -22,8 +22,6 @@ public class InstanceStatusAdjustCommand extends AbstractCommand { private ClusterShardHostPort instance; - private boolean isCrossRegion; - private InstanceHealthStatusCollector collector; private OuterClientService outerClientService; @@ -42,12 +40,11 @@ public class InstanceStatusAdjustCommand extends AbstractCommand { private static final Logger logger = LoggerFactory.getLogger(InstanceStatusAdjustCommand.class); - public InstanceStatusAdjustCommand(ClusterShardHostPort instance, boolean isCrossRegion, InstanceHealthStatusCollector collector, + public InstanceStatusAdjustCommand(ClusterShardHostPort instance, InstanceHealthStatusCollector collector, OuterClientService outerClientService, boolean state, long deadlineTimeMilli, StabilityHolder stabilityHolder, CheckerConfig config, MetaCache metaCache, AlertManager alertManager) { this.instance = instance; - this.isCrossRegion = isCrossRegion; this.collector = collector; this.outerClientService = outerClientService; this.state = state; @@ -77,7 +74,7 @@ protected void doExecute() throws Throwable { return; } Boolean xpipeHealthState = - collector.collectXPipeInstanceHealth(instance.getHostPort(), isCrossRegion).aggregate(instance.getHostPort(), config.getQuorum()); + collector.collectXPipeInstanceHealth(instance.getHostPort()).aggregate(instance.getHostPort(), config.getQuorum()); if (null == xpipeHealthState || !xpipeHealthState.equals(state)) { logger.info("[compensate][skip][xpipe state change] {}->{} {}", state, xpipeHealthState, instance); future().setSuccess(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjuster.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjuster.java index 647e936bb..5f242bca6 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjuster.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjuster.java @@ -50,10 +50,10 @@ public InstanceStatusAdjuster(InstanceHealthStatusCollector collector, AlertMana this.executors = executorFactory.createExecutorService(); } - public void adjustInstances(Set instances, boolean isCrossRegion, boolean state, long timeoutAtMilli) { + public void adjustInstances(Set instances, boolean state, long timeoutAtMilli) { for (HostPort instance: instances) { Pair clusterShard = metaCache.findClusterShard(instance); - new InstanceStatusAdjustCommand(new ClusterShardHostPort(clusterShard.getKey(), clusterShard.getValue(), instance), isCrossRegion, + new InstanceStatusAdjustCommand(new ClusterShardHostPort(clusterShard.getKey(), clusterShard.getValue(), instance), collector, OuterClientService.DEFAULT, state, timeoutAtMilli, siteStability, config, metaCache, alertManager).execute(executors); } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java index b61907c86..0d6942e4c 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java @@ -1,6 +1,5 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.handler; -import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.concurrent.FinalStateSetterManager; import com.ctrip.xpipe.endpoint.ClusterShardHostPort; import com.ctrip.xpipe.endpoint.HostPort; @@ -49,8 +48,6 @@ public abstract class AbstractHealthEventHandler listeners; @@ -51,25 +45,17 @@ public class PingActionFactory implements RedisHealthCheckActionFactory delayPingCollectors; - @Autowired - private List psubPingActionCollectors; - private Map> controllersByClusterType; private Map> listenerByClusterType; private Map> delayPingCollectorsByClusterType; - private Map> psubPingActionCollectorsByClusterType; - - protected static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); - @PostConstruct public void postConstruct() { controllersByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(controllers); listenerByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(listeners); delayPingCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(delayPingCollectors); - psubPingActionCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(psubPingActionCollectors); } @Override @@ -77,23 +63,15 @@ public PingAction create(RedisHealthCheckInstance instance) { PingAction pingAction = new PingAction(scheduled, instance, executors); ClusterType clusterType = instance.getCheckInfo().getClusterType(); + pingAction.addListeners(listenerByClusterType.get(clusterType)); pingAction.addControllers(controllersByClusterType.get(clusterType)); - - if (currentDcId.equalsIgnoreCase(instance.getCheckInfo().getActiveDc())) { - pingAction.addListeners(listenerByClusterType.get(clusterType)); - if(instance instanceof DefaultRedisHealthCheckInstance) { - pingAction.addListener(((DefaultRedisHealthCheckInstance)instance).createPingListener()); - } - delayPingCollectorsByClusterType.get(clusterType).forEach(collector -> { - if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); - }); + if(instance instanceof DefaultRedisHealthCheckInstance) { + pingAction.addListener(((DefaultRedisHealthCheckInstance)instance).createPingListener()); } - if (metaCache.isCrossRegion(currentDcId, instance.getCheckInfo().getActiveDc())) { - psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { - if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); - }); - } + delayPingCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); + }); return pingAction; } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java deleted file mode 100644 index e5d32397a..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; - -import com.ctrip.xpipe.api.foundation.FoundationService; -import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; -import com.ctrip.xpipe.redis.core.meta.MetaCache; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class OneWayPsubActionController implements PsubActionController, OneWaySupport { - - @Autowired - private MetaCache metaCache; - - private static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); - - @Override - public boolean shouldCheck(RedisHealthCheckInstance instance) { - RedisInstanceInfo info = instance.getCheckInfo(); - return metaCache.isCrossRegion(currentDcId, info.getActiveDc()) && currentDcId.equalsIgnoreCase(info.getDcId()); - } -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java deleted file mode 100644 index 7bf07be14..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; - -import com.ctrip.xpipe.redis.checker.healthcheck.AbstractHealthCheckAction; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; -import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; - -public class PsubAction extends AbstractHealthCheckAction { - - protected static final Logger logger = LoggerFactory.getLogger(PsubAction.class); - - private String[] pubSubChannelPrefix; - - public PsubAction(ScheduledExecutorService scheduled, RedisHealthCheckInstance instance, ExecutorService executors) { - super(scheduled, instance, executors); - this.pubSubChannelPrefix = new String[]{"xpipe*"}; - } - - @Override - protected void doTask() { - RedisSession session = instance.getRedisSession(); - doPSubscribe(session, new RedisSession.SubscribeCallback() { - @Override - public void message(String channel, String message) { - logger.debug("[PsubAction][{}]success, channel:{}, message : {}", instance.getEndpoint(), channel, message); - notifyListeners(new PsubActionContext(instance, message)); - } - - @Override - public void fail(Throwable e) { - logger.error("[PsubAction][{}] fail", instance.getEndpoint(), e); - //ignore psub fail - } - }, pubSubChannelPrefix); - } - - @Override - protected Logger getHealthCheckLogger() { - return logger; - } - - protected void doPSubscribe(RedisSession session, RedisSession.SubscribeCallback callback, String... channel) { - session.psubscribeIfAbsent(callback, channel); - } - -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java deleted file mode 100644 index 017aa6d4d..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; - -import com.ctrip.xpipe.redis.checker.healthcheck.AbstractActionContext; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; - -public class PsubActionContext extends AbstractActionContext { - public PsubActionContext(RedisHealthCheckInstance instance, String s) { - super(instance, s); - } -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java deleted file mode 100644 index c5e38e723..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; - -import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckActionController; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; - -public interface PsubActionController extends HealthCheckActionController { -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java deleted file mode 100644 index 891491ae0..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; - -import com.ctrip.xpipe.api.foundation.FoundationService; -import com.ctrip.xpipe.cluster.ClusterType; -import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckActionFactory; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; -import com.ctrip.xpipe.redis.checker.healthcheck.util.ClusterTypeSupporterSeparator; -import com.ctrip.xpipe.redis.core.meta.MetaCache; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; - -import static com.ctrip.xpipe.redis.checker.resource.Resource.PING_DELAY_INFO_EXECUTORS; -import static com.ctrip.xpipe.redis.checker.resource.Resource.PING_DELAY_INFO_SCHEDULED; - -@Component -public class PsubActionFactory implements RedisHealthCheckActionFactory, OneWaySupport { - - @Autowired - private MetaCache metaCache; - - @Resource(name = PING_DELAY_INFO_SCHEDULED) - private ScheduledExecutorService scheduled; - - @Resource(name = PING_DELAY_INFO_EXECUTORS) - private ExecutorService executors; - - @Autowired - private List controllers; - - @Autowired - private List psubPingActionCollectors; - - private Map> controllersByClusterType; - - private Map> psubPingActionCollectorsByClusterType; - - private static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); - - @PostConstruct - public void postConstruct() { - controllersByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(controllers); - psubPingActionCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(psubPingActionCollectors); - } - - @Override - public PsubAction create(RedisHealthCheckInstance instance) { - PsubAction psubAction = new PsubAction(scheduled, instance, executors); - ClusterType clusterType = instance.getCheckInfo().getClusterType(); - - psubAction.addControllers(controllersByClusterType.get(clusterType)); - psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { - if (collector.supportInstance(instance)) psubAction.addListener(collector.createPsubActionListener()); - }); - return psubAction; - } - - @Override - public boolean supportInstnace(RedisHealthCheckInstance instance) { - RedisInstanceInfo info = instance.getCheckInfo(); - return metaCache.isCrossRegion(currentDcId, info.getActiveDc()) && currentDcId.equalsIgnoreCase(info.getDcId()); - } -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java deleted file mode 100644 index 59a19d494..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java +++ /dev/null @@ -1,15 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; - -import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; -import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; -import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckActionListener; -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; - -public interface PsubActionListener extends HealthCheckActionListener> { - - @Override - default boolean worksfor(ActionContext t) { - return t instanceof PsubActionContext; - } - -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java deleted file mode 100644 index 17cf8a6e6..000000000 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; - -import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener; - -public interface PsubPingActionCollector { - - boolean supportInstance(RedisHealthCheckInstance instance); - - PingActionListener createPingActionListener(); - - PsubActionListener createPsubActionListener(); - -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java index 53b8c2ee2..9bfe46e78 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java @@ -6,7 +6,6 @@ import com.ctrip.xpipe.redis.checker.healthcheck.KeeperInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.actions.delay.DelayConfig; -import com.ctrip.xpipe.redis.core.meta.MetaCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,9 +20,9 @@ public class CompositeHealthCheckConfig implements HealthCheckConfig { private HealthCheckConfig config; - public CompositeHealthCheckConfig(RedisInstanceInfo instanceInfo, CheckerConfig checkerConfig, DcRelationsService dcRelationsService, boolean isCrossRegion) { + public CompositeHealthCheckConfig(RedisInstanceInfo instanceInfo, CheckerConfig checkerConfig, DcRelationsService dcRelationsService) { logger.info("[CompositeHealthCheckConfig] {}", instanceInfo); - if(isCrossRegion) { + if(instanceInfo.isCrossRegion()) { config = new ProxyEnabledHealthCheckConfig(checkerConfig, dcRelationsService); logger.info("[CompositeHealthCheckConfig][proxied] ping down time: {}", config.pingDownAfterMilli()); } else { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java index d5022fb3e..665b0270a 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java @@ -10,9 +10,6 @@ import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.*; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionFactory; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubAction; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionFactory; import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.RedisCheckRule; import com.ctrip.xpipe.redis.checker.healthcheck.config.CompositeHealthCheckConfig; import com.ctrip.xpipe.redis.checker.healthcheck.config.DefaultHealthCheckConfig; @@ -127,7 +124,7 @@ public RedisHealthCheckInstance create(RedisMeta redisMeta) { RedisInstanceInfo info = createRedisInstanceInfo(redisMeta); Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redisMeta); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService); instance.setEndpoint(endpoint) .setSession(redisSessionManager.findOrCreateSession(endpoint)) @@ -231,7 +228,7 @@ public RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta r DefaultRedisHealthCheckInstance instance = new DefaultRedisHealthCheckInstance(); RedisInstanceInfo info = createRedisInstanceInfo(redis); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService); Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redis); instance.setEndpoint(endpoint) @@ -245,25 +242,6 @@ public RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta r return instance; } - @Override - public RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis) { - DefaultRedisHealthCheckInstance instance = new DefaultRedisHealthCheckInstance(); - - RedisInstanceInfo info = createRedisInstanceInfo(redis); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); - Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redis); - - instance.setEndpoint(endpoint) - .setSession(redisSessionManager.findOrCreateSession(endpoint)) - .setInstanceInfo(info) - .setHealthCheckConfig(config); - - initActionsForRedisForPsubPingAction(instance); - startCheck(instance); - - return instance; - } - private void initActionsForRedisForAssignedAction(DefaultRedisHealthCheckInstance instance) { for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { if (factory instanceof KeeperSupport) @@ -271,13 +249,6 @@ private void initActionsForRedisForAssignedAction(DefaultRedisHealthCheckInstanc } } - private void initActionsForRedisForPsubPingAction(DefaultRedisHealthCheckInstance instance) { - for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { - if (factory instanceof PingActionFactory || factory instanceof PsubActionFactory) - initActions(instance, factory); - } - } - @SuppressWarnings("unchecked") private void initActions(DefaultRedisHealthCheckInstance instance) { for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java index 9cfb58e89..390ecbc75 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java @@ -35,8 +35,6 @@ public class DefaultHealthCheckInstanceManager implements HealthCheckInstanceMan private ConcurrentMap redisInstanceForAssignedAction = Maps.newConcurrentMap(); - private ConcurrentMap redisInstanceForPingAction = Maps.newConcurrentMap(); - @Autowired private HealthCheckInstanceFactory instanceFactory; @@ -63,18 +61,6 @@ public RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisM return null; } - @Override - public RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis) { - try { - HostPort key = new HostPort(redis.getIp(), redis.getPort()); - return MapUtils.getOrCreate(redisInstanceForPingAction, key, - () -> instanceFactory.getOrCreateRedisInstanceForPsubPingAction(redis)); - } catch (Throwable th) { - logger.error("getOrCreate ping action health check redis instance:{}:{}", redis.getIp(), redis.getPort()); - } - return null; - } - @Override public KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper) { try { @@ -107,11 +93,6 @@ public RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort host return redisInstanceForAssignedAction.get(hostPort); } - @Override - public RedisHealthCheckInstance findRedisInstanceForPsubPingAction(HostPort hostPort) { - return redisInstanceForPingAction.get(hostPort); - } - @Override public KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort) { return keeperInstances.get(hostPort); @@ -144,13 +125,6 @@ public RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort ho return instance; } - @Override - public RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort) { - RedisHealthCheckInstance instance = redisInstanceForPingAction.remove(hostPort); - if (null != instance) instanceFactory.remove(instance); - return instance; - } - @Override public ClusterHealthCheckInstance remove(String cluster) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java index ea8f4a7b3..4f0b23adb 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.core.meta.comparator.KeeperContainerMetaComparator.getAllKeeperContainerDetailInfoFromDcMeta; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @@ -145,10 +146,6 @@ void generateHealthCheckInstances() { generateHealthCheckInstances(cluster); } - if (clusterType == ClusterType.ONE_WAY && isClusterActiveDcCrossRegion(cluster) && clusterDcIsCurrentDc(cluster)) { - generatePsubPingActionHealthCheckInstances(cluster); - } - } } } @@ -171,14 +168,6 @@ void generateHealthCheckInstances(ClusterMeta clusterMeta){ instanceManager.getOrCreate(clusterMeta); } - void generatePsubPingActionHealthCheckInstances(ClusterMeta clusterMeta){ - for(ShardMeta shard : clusterMeta.getShards().values()) { - for(RedisMeta redis : shard.getRedises()) { - instanceManager.getOrCreateRedisInstanceForPsubPingAction(redis); - } - } - } - private boolean isClusterActiveIdcCurrentIdc(ClusterMeta cluster) { return cluster.getActiveDc().equalsIgnoreCase(currentDcId); @@ -217,8 +206,4 @@ private boolean hasMultipleActiveDcs(ClusterType clusterType) { return clusterType.supportMultiActiveDC() && !clusterType.isCrossDc(); } - private boolean isClusterActiveDcCrossRegion(ClusterMeta clusterMeta) { - return metaCache.isCrossRegion(currentDcId, clusterMeta.getActiveDc()); - } - } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java index 4a89b794e..0b82cb610 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java @@ -27,6 +27,4 @@ public interface HealthCheckInstanceFactory { void remove(ClusterHealthCheckInstance instance); RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta redis); - - RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/ClusterMetaVisitor.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/ClusterMetaVisitor.java index f5ce6549e..1bf18e2ae 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/ClusterMetaVisitor.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/ClusterMetaVisitor.java @@ -21,6 +21,11 @@ public ClusterMetaVisitor(ShardMetaVisitor shardMetaVisitor) { @Override public void accept(ClusterMeta clusterMeta) { + if (ClusterType.lookup(clusterMeta.getType()).supportSingleActiveDC() + && clusterMeta.getBackupDcs() != null + && Sets.newHashSet(clusterMeta.getBackupDcs().toUpperCase().split("\\s*,\\s*")).contains(FoundationService.DEFAULT.getDataCenter().toUpperCase())) { + return; + } for(ShardMeta shard : clusterMeta.getShards().values()) { shardMetaVisitor.accept(shard); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java index 9853f7c22..796505132 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java @@ -9,7 +9,6 @@ import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager; import com.ctrip.xpipe.redis.checker.healthcheck.impl.HealthCheckEndpointFactory; import com.ctrip.xpipe.redis.core.entity.*; -import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.redis.core.meta.MetaComparator; import com.ctrip.xpipe.redis.core.meta.MetaComparatorVisitor; import com.ctrip.xpipe.redis.core.meta.comparator.ClusterMetaComparator; @@ -49,8 +48,6 @@ public class DefaultDcMetaChangeManager extends AbstractStartStoppable implement private CheckerConfig checkerConfig; - private MetaCache metaCache; - private final String dcId; private final List clustersToDelete = new ArrayList<>(); @@ -61,14 +58,12 @@ public class DefaultDcMetaChangeManager extends AbstractStartStoppable implement public DefaultDcMetaChangeManager(String dcId, HealthCheckInstanceManager instanceManager, HealthCheckEndpointFactory healthCheckEndpointFactory, CheckerConsoleService checkerConsoleService, - CheckerConfig checkerConfig, - MetaCache metaCache) { + CheckerConfig checkerConfig) { this.dcId = dcId; this.instanceManager = instanceManager; this.healthCheckEndpointFactory = healthCheckEndpointFactory; this.checkerConsoleService = checkerConsoleService; this.checkerConfig = checkerConfig; - this.metaCache = metaCache; } @Override @@ -109,12 +104,10 @@ public void compare(DcMeta future, DcMeta allFutureDcMeta) { private void removeAndAdd() { this.redisListToDelete.forEach(this::removeRedis); - this.redisListToDelete.forEach(this::removeRedisOnlyForPingAction); this.clustersToDelete.forEach(this::removeCluster); this.clustersToAdd.forEach(this::addCluster); this.redisListToAdd.forEach(this::addRedis); - this.redisListToAdd.forEach(this::addRedisOnlyForPingAction); } private void clearUp() { @@ -132,24 +125,19 @@ private void removeCluster(ClusterMeta removed) { logger.info("[removeCluster][{}][{}] remove health check", dcId, removed.getId()); ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(removeConsumer))); - ClusterMetaVisitor clusterMetaPingActionVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(removePingActionConsumer))); clusterMetaVisitor.accept(removed); - clusterMetaPingActionVisitor.accept(removed); } private void addCluster(ClusterMeta added) { - if (isInterestedInCluster(added)) { - logger.info("[addCluster][{}][{}] add health check", dcId, added.getId()); - instanceManager.getOrCreate(added); - ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addConsumer))); - clusterMetaVisitor.accept(added); - } - - if (isOneWayClusterActiveDcCrossRegionAndCurrentDc(added)) { - ClusterMetaVisitor clusterMetaPingActionVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addPingActionConsumer))); - clusterMetaPingActionVisitor.accept(added); + if (!isInterestedInCluster(added)) { + logger.info("[addCluster][{}][skip] cluster not interested", added.getId()); + return; } + logger.info("[addCluster][{}][{}] add health check", dcId, added.getId()); + instanceManager.getOrCreate(added); + ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addConsumer))); + clusterMetaVisitor.accept(added); } private void removeRedis(RedisMeta removed) { @@ -211,11 +199,6 @@ protected boolean isInterestedInCluster(ClusterMeta cluster) { return result; } - private boolean isOneWayClusterActiveDcCrossRegionAndCurrentDc(ClusterMeta cluster) { - ClusterType clusterType = ClusterType.lookup(cluster.getType()); - return clusterType == ClusterType.ONE_WAY && isClusterActiveDcCrossRegion(cluster) && clusterDcIsCurrentDc(cluster); - } - private boolean clusterDcIsCurrentDc(ClusterMeta clusterMeta) { return clusterMeta.parent().getId().equalsIgnoreCase(currentDcId); } @@ -237,10 +220,6 @@ private boolean hasMultipleActiveDcs(ClusterType clusterType) { return clusterType.supportMultiActiveDC() && !clusterType.isCrossDc(); } - private boolean isClusterActiveDcCrossRegion(ClusterMeta clusterMeta) { - return metaCache.isCrossRegion(currentDcId, clusterMeta.getActiveDc()); - } - private Consumer removeConsumer = new Consumer() { @Override public void accept(RedisMeta redisMeta) { @@ -255,20 +234,6 @@ public void accept(RedisMeta redisMeta) { } }; - private Consumer removePingActionConsumer = new Consumer() { - @Override - public void accept(RedisMeta redisMeta) { - removeRedisOnlyForPingAction(redisMeta); - } - }; - - private Consumer addPingActionConsumer = new Consumer() { - @Override - public void accept(RedisMeta redisMeta) { - addRedisOnlyForPingAction(redisMeta); - } - }; - @Override protected void doStart() { if (current == null) { @@ -311,20 +276,6 @@ private void addRedisOnlyForUsedMemory(RedisMeta added) { instanceManager.getOrCreateRedisInstanceForAssignedAction(added); } - private void removeRedisOnlyForPingAction(RedisMeta removed) { - if (null != instanceManager.removeRedisInstanceForPingAction(new HostPort(removed.getIp(), removed.getPort()))) { - logger.info("[removeRedisOnlyForPingAction][{}:{}] {}", removed.getIp(), removed.getPort(), removed); - } - } - - private void addRedisOnlyForPingAction(RedisMeta added) { - if (!isOneWayClusterActiveDcCrossRegionAndCurrentDc(added.parent().parent())) { - return; - } - logger.info("[addRedisOnlyForPingAction][{}:{}] {}", added.getIp(), added.getPort(), added); - instanceManager.getOrCreateRedisInstanceForPsubPingAction(added); - } - private class KeeperContainerMetaComparatorVisitor implements MetaComparatorVisitor { @Override diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java index 82655e604..82a349e89 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java @@ -97,7 +97,7 @@ public DcMetaChangeManager getOrCreate(String dcId) { @Override public DcMetaChangeManager create() { return new DefaultDcMetaChangeManager(dcId, instanceManager, healthCheckEndpointFactory, - checkerConsoleService, checkerConfig, metaCache); + checkerConsoleService, checkerConfig); } }); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/ShardMetaVisitor.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/ShardMetaVisitor.java index 96cc0483b..b63005ad0 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/ShardMetaVisitor.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/ShardMetaVisitor.java @@ -22,6 +22,11 @@ public ShardMetaVisitor(RedisMetaVisitor redisMetaVisitor) { @Override public void accept(ShardMeta shardMeta) { + if (ClusterType.lookup(((ClusterMeta) shardMeta.parent()).getType()).supportSingleActiveDC() + && shardMeta.getBackupDcs() != null + && Sets.newHashSet(shardMeta.getBackupDcs().toUpperCase().split("\\s*,\\s*")).contains(FoundationService.DEFAULT.getDataCenter().toUpperCase())) { + return; + } for(RedisMeta redisMeta : shardMeta.getRedises()) { redisMetaVisitor.accept(redisMeta); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java index df5e2c7f1..2cad344d3 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java @@ -83,8 +83,6 @@ public void check() { Subscribe command = pubSubConnectionWrapper.command.get(); if (command instanceof CRDTSubscribeCommand) { crdtsubscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); - } else if (command instanceof PsubscribeCommand) { - psubscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); } else { subscribeIfAbsent(pubSubConnectionWrapper.getCallback(), channelArray); } @@ -111,10 +109,6 @@ public synchronized void crdtsubscribeIfAbsent(SubscribeCallback callback, Strin subscribeIfAbsent(callback, () -> new CRDTSubscribeCommand(clientPool, scheduled, commandTimeOut, channel), channel); } - public synchronized void psubscribeIfAbsent(SubscribeCallback callback, String... channel) { - subscribeIfAbsent(callback, () -> new PsubscribeCommand(clientPool, scheduled, commandTimeOut, channel), channel); - } - private synchronized void subscribeIfAbsent(SubscribeCallback callback, Supplier subCommandSupplier, String... channel) { PubSubConnectionWrapper pubSubConnectionWrapper = subscribConns.get(Sets.newHashSet(channel)); if (pubSubConnectionWrapper == null || pubSubConnectionWrapper.shouldCreateNewSession()) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java index c3f1f83d7..667b35d47 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultCheckerService.java @@ -17,12 +17,8 @@ public class DefaultCheckerService extends AbstractService implements CheckerSer private static final String PATH_GET_HEALTH_STATE = "/api/health/{ip}/{port}"; - private static final String PATH_GET_CROSS_REGION_HEALTH_STATE = "/api/health/cross/region/{ip}/{port}"; - private static final String PATH_GET_ALL_INSTANCE_HEALTH_STATUS = "/api/health/check/status/all"; - private static final String PATH_GET_ALL_CROSS_REGION_INSTANCE_HEALTH_STATUS = "/api/health/check/cross/region/status/all"; - public DefaultCheckerService(String host) { if (host.startsWith("http://")) this.host = host; else this.host = "http://" + host; @@ -33,16 +29,6 @@ public HEALTH_STATE getInstanceStatus(String ip, int port) { return restTemplate.getForObject(host + PATH_GET_HEALTH_STATE, HEALTH_STATE.class, ip, port); } - @Override - public HEALTH_STATE getCrossRegionInstanceStatus(String ip, int port) { - return restTemplate.getForObject(host + PATH_GET_CROSS_REGION_HEALTH_STATE, HEALTH_STATE.class, ip, port); - } - - @Override - public Map getAllInstanceCrossRegionHealthStatus() { - return restTemplate.getForObject(host + PATH_GET_ALL_CROSS_REGION_INSTANCE_HEALTH_STATUS, AllInstanceHealthStatus.class); - } - @Override public Map getAllInstanceHealthStatus() { return restTemplate.getForObject(host + PATH_GET_ALL_INSTANCE_HEALTH_STATUS, AllInstanceHealthStatus.class); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManager.java index d40fbe9c2..aa86fd21a 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManager.java @@ -1,6 +1,5 @@ package com.ctrip.xpipe.redis.checker.impl; -import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.CheckerService; import com.ctrip.xpipe.redis.checker.RemoteCheckerManager; @@ -8,7 +7,6 @@ import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; -import com.ctrip.xpipe.redis.core.meta.MetaCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,17 +24,12 @@ public class DefaultRemoteCheckerManager implements RemoteCheckerManager { private GroupCheckerLeaderElector checkerLeaderElector; - private MetaCache metaCache; - private Logger logger = LoggerFactory.getLogger(DefaultRemoteCheckerManager.class); - private static final String currentDc = FoundationService.DEFAULT.getDataCenter(); - - public DefaultRemoteCheckerManager(CheckerConfig checkerConfig, GroupCheckerLeaderElector checkerLeaderElector, MetaCache metaCache) { + public DefaultRemoteCheckerManager(CheckerConfig checkerConfig, GroupCheckerLeaderElector checkerLeaderElector) { this.remoteCheckers = new HashMap<>(); this.config = checkerConfig; this.checkerLeaderElector = checkerLeaderElector; - this.metaCache = metaCache; } @Override @@ -47,13 +40,7 @@ public List getHealthStates(String ip, int port) { checkerAddressList.forEach(checker -> { try { if (!remoteCheckers.containsKey(checker)) remoteCheckers.put(checker, new DefaultCheckerService(checker)); - HostPort instance = new HostPort(ip, port); - HEALTH_STATE state; - if (Objects.equals(currentDc, metaCache.getDc(instance)) && metaCache.isCrossRegion(metaCache.getActiveDc(instance), currentDc)) { - state = remoteCheckers.get(checker).getCrossRegionInstanceStatus(ip, port); - } else { - state = remoteCheckers.get(checker).getInstanceStatus(ip, port); - } + HEALTH_STATE state = remoteCheckers.get(checker).getInstanceStatus(ip, port); result.add(state); } catch (Throwable th) { logger.info("[getHealthStates][{}] fail", checker, th); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java index 8dce340ce..5c02c2a1d 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java @@ -9,9 +9,6 @@ import com.ctrip.xpipe.redis.checker.RedisDelayManager; import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderAware; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultPsubPingActionCollector; -import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStateService; import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingService; import com.ctrip.xpipe.redis.checker.model.CheckerRole; @@ -24,12 +21,8 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.Collectors; /** * @author lishanglin @@ -37,7 +30,7 @@ */ public class HealthCheckReporter implements GroupCheckerLeaderAware { - private List healthStateServices; + private HealthStateService healthStateService; private RedisDelayManager redisDelayManager; @@ -67,11 +60,11 @@ public class HealthCheckReporter implements GroupCheckerLeaderAware { private static final Logger logger = LoggerFactory.getLogger(HealthCheckReporter.class); - public HealthCheckReporter(List healthStateServices, CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, + public HealthCheckReporter(HealthStateService healthStateService, CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, ClusterServer clusterServer, ClusterServer allCheckerServer, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, ClusterHealthManager clusterHealthManager, int serverPort) { - this.healthStateServices = healthStateServices; + this.healthStateService = healthStateService; this.serverPort = serverPort; this.config = checkerConfig; this.checkerConsoleService = checkerConsoleService; @@ -149,7 +142,7 @@ private void reportCheckResult() { result.encodeCrossMasterDelays(crossMasterDelayManager.getAllCrossMasterDelays()); result.encodeRedisAlives(pingService.getAllRedisAlives()); result.setWarningClusterShards(clusterHealthManager.getAllClusterWarningShards()); - result.encodeRedisStates(getAllRedisStates()); + result.encodeRedisStates(healthStateService.getAllCachedState()); result.setHeteroShardsDelay(redisDelayManager.getAllHeteroShardsDelays()); checkerConsoleService.report(config.getConsoleAddress(), result); @@ -158,28 +151,4 @@ private void reportCheckResult() { } } - private Map getAllRedisStates() { - Map redisStates = new HashMap<>(); - DefaultDelayPingActionCollector delayPingActionCollector = null; - DefaultPsubPingActionCollector psubPingActionCollector = null; - for (HealthStateService service : healthStateServices) { - if (service instanceof DefaultDelayPingActionCollector) { - delayPingActionCollector = (DefaultDelayPingActionCollector) service; - } - if (service instanceof DefaultPsubPingActionCollector) { - psubPingActionCollector = (DefaultPsubPingActionCollector) service; - } - } - if (delayPingActionCollector != null) { - redisStates.putAll(delayPingActionCollector.getAllCachedState()); - } - if (psubPingActionCollector != null) { - Map allCachedState = psubPingActionCollector.getAllCachedState(); - for (Map.Entry entry : allCachedState.entrySet()) { - redisStates.put(entry.getKey(), entry.getValue()); - } - } - return redisStates; - } - } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java index c88bd7bb9..533156a23 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/DefaultCheckerConsoleService.java @@ -198,21 +198,10 @@ public Map loadAllClusterCreateTime(String console) { } @Override - public Map loadAllDcOneWayClusterInfo(String console, String dc) { + public Map loadAllActiveDcOneWayClusterInfo(String console, String activeDc) { UriComponents comp = UriComponentsBuilder .fromHttpUrl(console + ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ACTIVE_DC_ONE_WAY_CLUSTERS) - .queryParam("dc", dc).build(); - - ResponseEntity> times = restTemplate.exchange( - comp.toString(), HttpMethod.GET, null, clusterInfoMapTypeDef); - return times.getBody(); - } - - @Override - public Map loadCurrentDcOneWayClusterInfo(String console, String dc) { - UriComponents comp = UriComponentsBuilder - .fromHttpUrl(console + ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ONE_WAY_CLUSTERS) - .queryParam("dc", dc).build(); + .queryParam("activeDc", activeDc).build(); ResponseEntity> times = restTemplate.exchange( comp.toString(), HttpMethod.GET, null, clusterInfoMapTypeDef); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java index e1df04132..7a85a0601 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java @@ -121,8 +121,8 @@ public CheckerRedisInfoManager redisInfoManager() { } @Bean - public RemoteCheckerManager remoteCheckerManager(CheckerConfig checkerConfig, MetaCache metaCache) { - return new DefaultRemoteCheckerManager(checkerConfig, new GroupCheckerLeaderElector("test"), metaCache); + public RemoteCheckerManager remoteCheckerManager(CheckerConfig checkerConfig) { + return new DefaultRemoteCheckerManager(checkerConfig, new GroupCheckerLeaderElector("test")); } @Bean @@ -210,12 +210,7 @@ public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws } @Override - public Map getAllDcClusters(String dc) { - return null; - } - - @Override - public Map getAllCurrentDcClusters(String dc) { + public Map getAllActiveDcClusters(String activeDc) { return null; } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java index ff4729b79..b5827edee 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusCollectorTest.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.redis.checker.CheckerService; import com.ctrip.xpipe.redis.checker.OuterClientCache; import com.ctrip.xpipe.redis.checker.RemoteCheckerManager; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatus; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc; @@ -58,7 +59,7 @@ public void setupInstanceHealthStatusCollectorTest() { public void testCollect() throws Exception { Map healthStatus = mockHealthStatusMap(HEALTH_STATE.HEALTHY); Mockito.when(remoteCheckerService.getAllInstanceHealthStatus()).thenReturn(healthStatus); - Mockito.when(outerClientCache.getAllDcClusters("jq")).thenReturn(mockOutClientResp(true)); + Mockito.when(outerClientCache.getAllActiveDcClusters("jq")).thenReturn(mockOutClientResp(true)); healthStatus = mockHealthStatusMap(HEALTH_STATE.HEALTHY); Mockito.when(localCheckerService.getAllInstanceHealthStatus()).thenReturn(healthStatus); Pair result = this.collector.collect(); @@ -112,7 +113,7 @@ public void testCollectXPipeInstanceHealth() throws Exception { HostPort mockInstance = new HostPort("10.0.0.1", 6379); Mockito.when(remoteCheckerService.getInstanceStatus(mockInstance.getHost(), mockInstance.getPort())).thenReturn(HEALTH_STATE.HEALTHY); Mockito.when(localCheckerService.getInstanceStatus(mockInstance.getHost(), mockInstance.getPort())).thenReturn(HEALTH_STATE.HEALTHY); - XPipeInstanceHealthHolder xpipeInstanceHealthHolder = this.collector.collectXPipeInstanceHealth(mockInstance, false); + XPipeInstanceHealthHolder xpipeInstanceHealthHolder = this.collector.collectXPipeInstanceHealth(mockInstance); Assert.assertEquals(Boolean.TRUE, xpipeInstanceHealthHolder.aggregate(mockInstance, 2)); } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java index 8f79ea0eb..c11e1acd9 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java @@ -84,6 +84,7 @@ public void setupInstanceHealthStatusConsistenceCheckerTest() throws Exception { Mockito.when(config.getRedisReplicationHealthCheckInterval()).thenReturn(2000); Mockito.when(config.getDownAfterCheckNums()).thenReturn(5); Mockito.when(siteStability.isSiteStable()).thenReturn(true); + Mockito.when(delayPingActionCollector.getState(any())).thenReturn(HEALTH_STATE.DOWN); } @Override @@ -102,9 +103,9 @@ public void testCheckAndAdjust() throws Exception { Mockito.when(xpipeInstanceHealthHolder.aggregate(ArgumentMatchers.eq(master), anyInt())).thenReturn(true); inspector.inspect(); - Mockito.verify(adjuster).adjustInstances(ArgumentMatchers.eq(Collections.singleton(new HostPort("10.0.0.1", 6379))), anyBoolean(), + Mockito.verify(adjuster).adjustInstances(ArgumentMatchers.eq(Collections.singleton(new HostPort("10.0.0.1", 6379))), ArgumentMatchers.eq(true), anyLong()); - Mockito.verify(adjuster).adjustInstances(ArgumentMatchers.eq(Collections.singleton(new HostPort("10.0.0.2", 6379))), anyBoolean(), + Mockito.verify(adjuster).adjustInstances(ArgumentMatchers.eq(Collections.singleton(new HostPort("10.0.0.2", 6379))), ArgumentMatchers.eq(false), anyLong()); } @@ -117,7 +118,7 @@ public void testNotMarkDownMasterUnhealthyInstances() throws Exception { Mockito.when(xpipeInstanceHealthHolder.aggregate(ArgumentMatchers.eq(master), anyInt())).thenReturn(false); inspector.inspect(); - Mockito.verify(adjuster, Mockito.never()).adjustInstances(any(), anyBoolean(), anyBoolean(), anyLong()); + Mockito.verify(adjuster, Mockito.never()).adjustInstances(any(), anyBoolean(), anyLong()); } @Test diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommandTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommandTest.java index 6ee40cc90..bea8774c2 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommandTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceStatusAdjustCommandTest.java @@ -59,12 +59,12 @@ public void setupInstanceStatusAdjustCommandTest() throws Exception { when(config.getHealthMarkCompensateIntervalMill()).thenReturn(1000L); when(config.getQuorum()).thenReturn(2); when(siteStability.isSiteStable()).thenReturn(true); - when(collector.collectXPipeInstanceHealth(instance.getHostPort(), false)).thenReturn(xpipeInstanceHealth); + when(collector.collectXPipeInstanceHealth(instance.getHostPort())).thenReturn(xpipeInstanceHealth); } @Test public void testAdjust() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(outerClientService.isInstanceUp(instance)).thenReturn(false); when(xpipeInstanceHealth.aggregate(instance.getHostPort(),2)).thenReturn(Boolean.TRUE); @@ -75,21 +75,21 @@ public void testAdjust() throws Exception { @Test(expected = ExecutionException.class) public void testTimeoutAfterCollect() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(outerClientService.isInstanceUp(instance)).thenReturn(false); when(xpipeInstanceHealth.aggregate(instance.getHostPort(),2)).thenReturn(Boolean.TRUE); doAnswer(inv -> { sleep(timeoutMilli + 1); return xpipeInstanceHealth; - }).when(collector).collectXPipeInstanceHealth(instance.getHostPort(), false); + }).when(collector).collectXPipeInstanceHealth(instance.getHostPort()); cmd.execute().get(); } @Test public void testSkipForOuterClientChange() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(outerClientService.isInstanceUp(instance)).thenReturn(true); cmd.execute().get(); @@ -99,7 +99,7 @@ public void testSkipForOuterClientChange() throws Exception { @Test public void testSkipForXPipeChange() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(outerClientService.isInstanceUp(instance)).thenReturn(false); when(xpipeInstanceHealth.aggregate(instance.getHostPort(),2)).thenReturn(null); @@ -110,7 +110,7 @@ public void testSkipForXPipeChange() throws Exception { @Test public void testSiteUnstable() throws Exception { - InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, false, collector, outerClientService, true, + InstanceStatusAdjustCommand cmd = new InstanceStatusAdjustCommand(instance, collector, outerClientService, true, System.currentTimeMillis() + timeoutMilli, siteStability, config, metaCache, alertManager); when(siteStability.isSiteStable()).thenReturn(false); cmd.execute().get(); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java index 1ac5f33d2..f32334f43 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java @@ -74,7 +74,7 @@ public void beforeDefaultDcMetaChangeManagerTest() throws IOException, SAXExcept when(checkerConfig.getConsoleAddress()).thenReturn("127.0.0.1"); when(checkerConsoleService.getXpipeDcAllMeta(Mockito.anyString(), Mockito.anyString())).thenReturn(getXpipeMeta()); - manager = new DefaultDcMetaChangeManager("oy", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); + manager = new DefaultDcMetaChangeManager("oy", instanceManager, factory, checkerConsoleService, checkerConfig); } private void prepareData(String dc) { @@ -163,7 +163,7 @@ public void testActiveDcOY2JQ() { manager.compare(future, null); Mockito.verify(instanceManager, times(2)).getOrCreate(any(RedisMeta.class)); - Mockito.verify(instanceManager, times(2)).remove(any(HostPort.class)); + Mockito.verify(instanceManager, never()).remove(any(HostPort.class)); Assert.assertEquals(Sets.newHashSet(new HostPort("127.0.0.2", 8100), new HostPort("127.0.0.2", 8101)), addedRedises); } @@ -263,7 +263,7 @@ public void visitRemoved() { @Test public void visitRemovedClusterActiveDc(){ - manager = spy(new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache)); + manager = spy(new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig)); manager.compare(getDcMeta("jq"), null); DcMeta dcMeta = MetaCloneFacade.INSTANCE.clone(getDcMeta("jq")); @@ -308,7 +308,7 @@ public void testSwitchClusterName() throws Exception { Mockito.verify(instanceManager, times(1)).getOrCreate(any(ClusterMeta.class)); Mockito.verify(instanceManager, times(4)).getOrCreate(any(RedisMeta.class)); - Mockito.verify(instanceManager, times(6)).remove(any(HostPort.class)); + Mockito.verify(instanceManager, times(4)).remove(any(HostPort.class)); Mockito.verify(instanceManager, never()).remove(anyString()); } @@ -359,7 +359,7 @@ public void testSwitchClusterShards() throws Exception { @Test public void testHeteroClusterModified() throws Exception { - DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); + DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig); DcMeta dcMeta= getDcMeta("jq"); ClusterMeta cluster1 = dcMeta.findCluster("cluster2"); cluster1.setBackupDcs("ali"); @@ -384,7 +384,7 @@ public void testHeteroClusterModified() throws Exception { @Test public void testBackupDcClusterModified() throws Exception { - DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); + DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig); DcMeta dcMeta= getDcMeta("jq"); ClusterMeta cluster1 = dcMeta.findCluster("cluster2"); cluster1.setBackupDcs("jq,ali"); @@ -399,7 +399,7 @@ public void testBackupDcClusterModified() throws Exception { Mockito.verify(instanceManager, never()).getOrCreate(any(ClusterMeta.class)); Mockito.verify(instanceManager, never()).getOrCreate(any(RedisMeta.class)); - Mockito.verify(instanceManager, times(2)).remove(any(HostPort.class)); + Mockito.verify(instanceManager, never()).remove(any(HostPort.class)); Mockito.verify(instanceManager, times(1)).remove(anyString()); } @@ -491,7 +491,7 @@ public void visitModified1() { @Test public void testKeeperChange() throws Exception { String dcId = FoundationService.DEFAULT.getDataCenter(); - manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); + manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig); prepareData(dcId); DcMeta future = cloneDcMeta(dcId); future.addKeeperContainer(new KeeperContainerMeta().setId(4L).setIp("1.1.1.4").setPort(8080)); @@ -512,7 +512,7 @@ public void testKeeperChange() throws Exception { @Test public void testRedisChange() throws Exception { String dcId = FoundationService.DEFAULT.getDataCenter(); - manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); + manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig); prepareData(dcId); DcMeta future = cloneDcMeta(dcId); future.addKeeperContainer(new KeeperContainerMeta().setId(4L).setIp("1.1.1.4").setPort(8080)); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManagerTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManagerTest.java index 2f7f94bf0..de9689221 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManagerTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/impl/DefaultRemoteCheckerManagerTest.java @@ -4,7 +4,6 @@ import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; -import com.ctrip.xpipe.redis.core.meta.MetaCache; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; @@ -37,14 +36,11 @@ public class DefaultRemoteCheckerManagerTest extends AbstractCheckerTest { @Mock private GroupCheckerLeaderElector checkerLeaderElector; - @Mock - private MetaCache metaCache; - private MockWebServer webServer; @Before public void setupDefaultRemoteCheckerManagerTest() throws Exception { - this.manager = new DefaultRemoteCheckerManager(checkerConfig, checkerLeaderElector, metaCache); + this.manager = new DefaultRemoteCheckerManager(checkerConfig, checkerLeaderElector); webServer = new MockWebServer(); String port = System.getProperty("server.port", "8080"); webServer.start(InetAddress.getByName("127.0.0.1"), Integer.parseInt(port)); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerApiService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerApiService.java index 1a55033a6..e04b7fe6a 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerApiService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerApiService.java @@ -7,18 +7,10 @@ public interface ConsoleCheckerApiService { String PATH_HEALTH_CHECK_INSTANCE = "/api/health/check/instance/{ip}/{port}"; - String PATH_CROSS_REGION_HEALTH_CHECK_INSTANCE = "/api/health/check/cross/region//instance/{ip}/{port}"; - String PATH_HEALTH_STATUS = "/api/health/{ip}/{port}"; - String PATH_CROSS_REGION_HEALTH_STATUS = "/api/health/cross/region/{ip}/{port}"; - String getHealthCheckInstance(HostPort checker, String ip, int port); - String getCrossRegionHealthCheckInstance(HostPort checker, String ip, int port); - HEALTH_STATE getHealthStates(HostPort checker, String ip, int port); - HEALTH_STATE getCrossRegionHealthStates(HostPort checker, String ip, int port); - } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerGroupService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerGroupService.java index 0b32899fb..b3e0f1098 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerGroupService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/ConsoleCheckerGroupService.java @@ -10,8 +10,8 @@ public interface ConsoleCheckerGroupService { HostPort getCheckerLeader(long clusterDbId); - CommandFuture> getAllHealthCheckInstance(long clusterDbId, String ip, int port, boolean isCrossRegion); + CommandFuture> getAllHealthCheckInstance(long clusterDbId, String ip, int port); - CommandFuture> getAllHealthStates(long clusterDbId, String ip, int port, boolean isCrossRegion); + CommandFuture> getAllHealthStates(long clusterDbId, String ip, int port); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetCommand.java index f2364b55b..368eeafdf 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetCommand.java @@ -15,14 +15,11 @@ public class InstanceHealthCheckGetCommand extends AbstractCommand { int port; - boolean isCrossRegion; - - public InstanceHealthCheckGetCommand(ConsoleCheckerApiService service, HostPort checker, String ip, int port, boolean isCrossRegion) { + public InstanceHealthCheckGetCommand(ConsoleCheckerApiService service, HostPort checker, String ip, int port) { this.service = service; this.checker = checker; this.ip = ip; this.port = port; - this.isCrossRegion = isCrossRegion; } @Override @@ -32,11 +29,7 @@ public String getName() { @Override protected void doExecute() throws Throwable { - if (isCrossRegion) { - future().setSuccess(service.getCrossRegionHealthCheckInstance(checker, ip, port)); - } else { - future().setSuccess(service.getHealthCheckInstance(checker, ip, port)); - } + future().setSuccess(service.getHealthCheckInstance(checker, ip, port)); } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetGroupCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetGroupCommand.java index dec739ecf..017d569f8 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetGroupCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthCheckGetGroupCommand.java @@ -24,16 +24,13 @@ public class InstanceHealthCheckGetGroupCommand extends AbstractCommand checkers, String ip, int port, boolean isCrossRegion, ExecutorService executor) { + public InstanceHealthCheckGetGroupCommand(ConsoleCheckerApiService service, List checkers, String ip, int port, ExecutorService executor) { this.service = service; this.checkers = checkers; this.ip = ip; this.port = port; - this.isCrossRegion = isCrossRegion; this.executor = executor; } @@ -46,7 +43,7 @@ public String getName() { protected void doExecute() { Map> futureMap = new HashMap<>(); checkers.forEach(checker -> { - CommandFuture future = new InstanceHealthCheckGetCommand(service, checker, ip, port, isCrossRegion).execute(executor); + CommandFuture future = new InstanceHealthCheckGetCommand(service, checker, ip, port).execute(executor); futureMap.put(checker, future); }); Map result = new HashMap<>(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthStatusGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthStatusGetCommand.java index 308025b2d..b78e98836 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthStatusGetCommand.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/command/InstanceHealthStatusGetCommand.java @@ -15,14 +15,11 @@ public class InstanceHealthStatusGetCommand extends AbstractCommand checkers, String ip, int port, boolean isCrossRegion, ExecutorService executor) { + public InstanceHealthStatusGetGroupCommand(ConsoleCheckerApiService service, List checkers, String ip, int port, ExecutorService executor) { this.service = service; this.checkers = checkers; this.ip = ip; this.port = port; - this.isCrossRegion = isCrossRegion; this.executor = executor; } @@ -49,7 +46,7 @@ public String getName() { protected void doExecute() throws Throwable { Map> futureMap = new HashMap<>(); checkers.forEach(checker -> { - CommandFuture future = new InstanceHealthStatusGetCommand(service, checker, ip, port, isCrossRegion).execute(executor); + CommandFuture future = new InstanceHealthStatusGetCommand(service, checker, ip, port).execute(executor); futureMap.put(checker, future); }); Map result = new HashMap<>(); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerApiService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerApiService.java index 6d1010272..f0199a7c2 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerApiService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerApiService.java @@ -14,21 +14,11 @@ public String getHealthCheckInstance(HostPort checker, String ip, int port) { return restTemplate.getForObject(getPath(checker, PATH_HEALTH_CHECK_INSTANCE), String.class, ip, port); } - @Override - public String getCrossRegionHealthCheckInstance(HostPort checker, String ip, int port) { - return restTemplate.getForObject(getPath(checker, PATH_CROSS_REGION_HEALTH_CHECK_INSTANCE), String.class, ip, port); - } - @Override public HEALTH_STATE getHealthStates(HostPort checker, String ip, int port) { return restTemplate.getForObject(getPath(checker, PATH_HEALTH_STATUS), HEALTH_STATE.class, ip, port); } - @Override - public HEALTH_STATE getCrossRegionHealthStates(HostPort checker, String ip, int port) { - return restTemplate.getForObject(getPath(checker, PATH_CROSS_REGION_HEALTH_STATUS), HEALTH_STATE.class, ip, port); - } - private String getPath(HostPort key, String path) { return "http://" + key.getHost() + ":" + key.getPort() + path; } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerGroupService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerGroupService.java index 9c1f40fc8..6932f99b6 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerGroupService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleCheckerGroupService.java @@ -40,13 +40,13 @@ public List getAllChecker(long clusterDbId) { } @Override - public CommandFuture> getAllHealthCheckInstance(long clusterDbId, String ip, int port, boolean isCrossRegion) { - return new InstanceHealthCheckGetGroupCommand(checkerApiService, getAllChecker(clusterDbId), ip, port, isCrossRegion, executor).execute(executor); + public CommandFuture> getAllHealthCheckInstance(long clusterDbId, String ip, int port) { + return new InstanceHealthCheckGetGroupCommand(checkerApiService, getAllChecker(clusterDbId), ip, port, executor).execute(executor); } @Override - public CommandFuture> getAllHealthStates(long clusterDbId, String ip, int port, boolean isCrossRegion) { - return new InstanceHealthStatusGetGroupCommand(checkerApiService, getAllChecker(clusterDbId), ip, port, isCrossRegion, executor).execute(executor); + public CommandFuture> getAllHealthStates(long clusterDbId, String ip, int port) { + return new InstanceHealthStatusGetGroupCommand(checkerApiService, getAllChecker(clusterDbId), ip, port, executor).execute(executor); } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java index 2261d8ca1..9701ea130 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/checker/impl/DefaultConsoleDcCheckerService.java @@ -54,20 +54,15 @@ public class DefaultConsoleDcCheckerService implements ConsoleDcCheckerService { @Override public List getShardAllCheckerGroupHealthCheck(String dcId, String clusterId, String shardId) { - List result = new ArrayList<>(); ClusterTbl clusterTbl = clusterService.find(clusterId); if (clusterTbl == null) { return null; } String activeDc = dcService.getDcName(clusterTbl.getActivedcId()); - if (metaCache.isCrossRegion(dcId, activeDc)) { - result.addAll(consoleManager.getShardAllCheckerGroupHealthCheck(dcId, dcId, clusterId, shardId)); - } if (activeDc.equalsIgnoreCase(currentDc)) { return getLocalDcShardAllCheckerGroupHealthCheck(dcId, clusterId, shardId); } - result.addAll(consoleManager.getShardAllCheckerGroupHealthCheck(activeDc, dcId, clusterId, shardId)); - return result; + return consoleManager.getShardAllCheckerGroupHealthCheck(activeDc, dcId, clusterId, shardId); } @Override @@ -80,9 +75,8 @@ public List getLocalDcShardAllCheckerGroupHealthCh Map>> redisCheckerActionMap = new HashMap<>(); Map>> redisCheckerHealthStateMap = new HashMap<>(); redisOfDcClusterShard.forEach(redisMeta -> { - boolean isCrossRegion = metaCache.isCrossRegion(currentDc, dcService.getDcName(clusterTbl.getActivedcId())); - CommandFuture> allHealthCheckInstance = consoleCheckerGroupService.getAllHealthCheckInstance(clusterTbl.getId(), redisMeta.getIp(), redisMeta.getPort(), isCrossRegion); - CommandFuture> allHealthStates = consoleCheckerGroupService.getAllHealthStates(clusterTbl.getId(), redisMeta.getIp(), redisMeta.getPort(), isCrossRegion); + CommandFuture> allHealthCheckInstance = consoleCheckerGroupService.getAllHealthCheckInstance(clusterTbl.getId(), redisMeta.getIp(), redisMeta.getPort()); + CommandFuture> allHealthStates = consoleCheckerGroupService.getAllHealthStates(clusterTbl.getId(), redisMeta.getIp(), redisMeta.getPort()); redisCheckerActionMap.put(new HostPort(redisMeta.getIp(), redisMeta.getPort()), allHealthCheckInstance); redisCheckerHealthStateMap.put(new HostPort(redisMeta.getIp(), redisMeta.getPort()), allHealthStates); }); @@ -94,7 +88,7 @@ public List getLocalDcShardAllCheckerGroupHealthCh List result = new ArrayList<>(); for (Map.Entry> entry : checkerInstancesMap.entrySet()) { HostPort checker = entry.getKey(); - ShardCheckerHealthCheckModel shardCheckerHealthCheckModel = new ShardCheckerHealthCheckModel(checker.getHost(), checker.getPort(), currentDc); + ShardCheckerHealthCheckModel shardCheckerHealthCheckModel = new ShardCheckerHealthCheckModel(checker.getHost(), checker.getPort()); if (checker.equals(checkerLeader)) { shardCheckerHealthCheckModel.setCheckerRole(CheckerRole.LEADER); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java index 943567a24..dcab438be 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/console/impl/DefaultConsoleService.java @@ -30,12 +30,8 @@ public class DefaultConsoleService extends AbstractService implements ConsoleSer private final String healthStatusUrl; - private final String crossRegionHealthStatusUrl; - private final String allHealthStatusUrl; - private final String allCrossRegionHealthStatusUrl; - private final String pingStatusUrl; private final String innerShardDelayStatusUrl; @@ -73,9 +69,7 @@ public DefaultConsoleService(String address){ this.address = "http://" + this.address; } healthStatusUrl = String.format("%s/api/health/{ip}/{port}", this.address); - crossRegionHealthStatusUrl = String.format("%s/api/cross/region/health/{ip}/{port}", this.address); allHealthStatusUrl = String.format("%s/api/health/check/status/all", this.address); - allCrossRegionHealthStatusUrl = String.format("%s/api/health/cross/region/check/status/all", this.address); pingStatusUrl = String.format("%s/api/redis/ping/{ip}/{port}", this.address); innerShardDelayStatusUrl = String.format("%s/api/shard/inner/delay/{shardId}", this.address); innerDelayStatusUrl = String.format("%s/api/redis/inner/delay/{ip}/{port}", this.address); @@ -95,21 +89,11 @@ public HEALTH_STATE getInstanceStatus(String ip, int port) { return restTemplate.getForObject(healthStatusUrl, HEALTH_STATE.class, ip, port); } - @Override - public HEALTH_STATE getCrossRegionInstanceStatus(String ip, int port) { - return restTemplate.getForObject(crossRegionHealthStatusUrl, HEALTH_STATE.class, ip, port); - } - @Override public Map getAllInstanceHealthStatus() { return restTemplate.getForObject(allHealthStatusUrl, AllInstanceHealthStatus.class); } - @Override - public Map getAllInstanceCrossRegionHealthStatus() { - return restTemplate.getForObject(allCrossRegionHealthStatusUrl, AllInstanceHealthStatus.class); - } - @Override public List getShardAllCheckerGroupHealthCheck(String dcId, String clusterId, String shardId) { return restTemplate.getForObject(shardAllCheckerGroupHealthCheckUrl, ShardCheckerHealthCheckModels.class, dcId, clusterId, shardId); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java index 2b959286f..d40ba7808 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java @@ -214,13 +214,8 @@ public void recordAlert(@RequestBody CheckerConsoleService.AlertMessage alertMes } @GetMapping(value = ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ACTIVE_DC_ONE_WAY_CLUSTERS) - public Map loadAllOuterClientClusters(@RequestParam String dc) { - return outerClientCache.getAllDcClusters(dc); - } - - @GetMapping(value = ConsoleCheckerPath.PATH_GET_ALL_CURRENT_DC_ONE_WAY_CLUSTERS) - public Map loadCurrentDcOuterClientClusters(@RequestParam String dc) { - return outerClientCache.getAllCurrentDcClusters(dc); + public Map loadAllOuterClientClusters(@RequestParam String activeDc) { + return outerClientCache.getAllActiveDcClusters(activeDc); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java index 356e30dc0..5f0cee57a 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/healthcheck/fulllink/model/ShardCheckerHealthCheckModel.java @@ -7,7 +7,6 @@ public class ShardCheckerHealthCheckModel { - private String idc; private String host; private int port; private CheckerRole checkerRole; @@ -16,22 +15,13 @@ public class ShardCheckerHealthCheckModel { public ShardCheckerHealthCheckModel() { } - public ShardCheckerHealthCheckModel(String host, int port, String idc) { + public ShardCheckerHealthCheckModel(String host, int port) { this.host = host; this.port = port; - this.idc = idc; this.checkerRole = CheckerRole.FOLLOWER; this.instances = new ArrayList<>(); } - public String getIdc() { - return idc; - } - - public void setIdc(String idc) { - this.idc = idc; - } - public String getHost() { return host; } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/AbstractGetAllDcCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractGetAllDcCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/AbstractGetAllDcCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractGetAllDcCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/AbstractKeeperCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/AbstractKeeperCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/CheckKeeperActiveCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/CheckKeeperActiveCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperActiveCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/CheckKeeperConnectedCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/CheckKeeperConnectedCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/CheckKeeperConnectedCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/FullSyncJudgeCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/FullSyncJudgeCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/KeeperContainerInfoGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerInfoGetCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/KeeperContainerInfoGetCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerInfoGetCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/KeeperContainerReplOffsetGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/KeeperContainerReplOffsetGetCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperContainerReplOffsetGetCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/KeeperReplIdGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperReplIdGetCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/KeeperReplIdGetCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperReplIdGetCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/KeeperResetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/KeeperResetCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/KeeperResetCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/MigrationKeeperContainerDetailInfoGetCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/MigrationKeeperContainerDetailInfoGetCommand.java similarity index 100% rename from redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/command/MigrationKeeperContainerDetailInfoGetCommand.java rename to redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/MigrationKeeperContainerDetailInfoGetCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java index c6123d8eb..01fbaba70 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/CheckerOuterClientCache.java @@ -34,22 +34,9 @@ public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws } @Override - public Map getAllDcClusters(String dc) { + public Map getAllActiveDcClusters(String activeDc) { try { - return service.loadAllDcOneWayClusterInfo(config.getConsoleAddress(), dc); - } catch (RestClientException e) { - logger.warn("[getAllOneWayClusters] rest fail, {}", e.getMessage()); - } catch (Throwable th) { - logger.warn("[getAllOneWayClusters] fail", th); - } - - return Collections.emptyMap(); - } - - @Override - public Map getAllCurrentDcClusters(String dc) { - try { - return service.loadCurrentDcOneWayClusterInfo(config.getConsoleAddress(), dc); + return service.loadAllActiveDcOneWayClusterInfo(config.getConsoleAddress(), activeDc); } catch (RestClientException e) { logger.warn("[getAllOneWayClusters] rest fail, {}", e.getMessage()); } catch (Throwable th) { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java index ab9675bc6..43f0eab79 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultOuterClientCache.java @@ -12,13 +12,11 @@ import org.springframework.stereotype.Component; import org.springframework.web.client.RestClientException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import com.ctrip.xpipe.api.migration.OuterClientService.*; /** * @author lishanglin @@ -32,28 +30,22 @@ public class DefaultOuterClientCache extends AbstractLifecycle implements OuterC private ConsoleConfig config; - private TimeBoundCache> clustersCache; - - private TimeBoundCache> currentDcClustersCache; + private TimeBoundCache> clustersCache; private ScheduledExecutorService scheduled; private DynamicDelayPeriodTask refreshTask; - private DynamicDelayPeriodTask refreshCurrentDcTask; - public DefaultOuterClientCache(ConsoleConfig config) { this.outerClientService = OuterClientService.DEFAULT; this.config = config; this.clustersCache = new TimeBoundCache<>(() -> 10000 + config.getRedisConfCheckIntervalMilli(), - () -> loadClusters(FoundationService.DEFAULT.getDataCenter())); - this.currentDcClustersCache = new TimeBoundCache<>(() -> 10000 + config.getRedisConfCheckIntervalMilli(), - () -> loadCurrentDcClusters(FoundationService.DEFAULT.getDataCenter())); + () -> loadActiveDcClusters(FoundationService.DEFAULT.getDataCenter())); } @Override - public ClusterInfo getClusterInfo(String clusterName) throws Exception { - ClusterInfo clusterInfo = clustersCache.getData(false).get(clusterName.toLowerCase()); + public OuterClientService.ClusterInfo getClusterInfo(String clusterName) throws Exception { + OuterClientService.ClusterInfo clusterInfo = clustersCache.getData(false).get(clusterName.toLowerCase()); if (null == clusterInfo) { return outerClientService.getClusterInfo(clusterName); } @@ -62,22 +54,16 @@ public ClusterInfo getClusterInfo(String clusterName) throws Exception { } @Override - public Map getAllDcClusters(String dc) { - if (FoundationService.DEFAULT.getDataCenter().equalsIgnoreCase(dc)) return clustersCache.getData(false); - else return loadClusters(dc); + public Map getAllActiveDcClusters(String activeDc) { + if (FoundationService.DEFAULT.getDataCenter().equalsIgnoreCase(activeDc)) return clustersCache.getData(false); + else return loadActiveDcClusters(activeDc); } - @Override - public Map getAllCurrentDcClusters(String dc) { - if (FoundationService.DEFAULT.getDataCenter().equalsIgnoreCase(dc)) return currentDcClustersCache.getData(false); - else return loadCurrentDcClusters(dc); - } - - private Map loadClusters(String dc) { - Map clusters = new HashMap<>(); + private Map loadActiveDcClusters(String activeDc) { + Map clusters = new HashMap<>(); try { - List clusterInfos = outerClientService.getActiveDcClusters(dc); - for (ClusterInfo cluster: clusterInfos) { + List clusterInfos = outerClientService.getActiveDcClusters(activeDc); + for (OuterClientService.ClusterInfo cluster: clusterInfos) { clusters.put(cluster.getName().toLowerCase(), cluster); } } catch (RestClientException e) { @@ -89,69 +75,6 @@ private Map loadClusters(String dc) { return clusters; } - private Map loadCurrentDcClusters(String dc) { - Map clusters = new HashMap<>(); - try { - DcMeta currentDcClusterInfos = outerClientService.getOutClientDcMeta(dc); - for (ClusterMeta clusterMeta: currentDcClusterInfos.getClusters().values()) { - if (!ClusterType.XPIPE_ONE_WAY.equals(clusterMeta.getClusterType())) continue; - ClusterInfo clusterInfo = buildClusterModel(clusterMeta); - if (!clusters.containsKey(clusterInfo.getName().toLowerCase())) { - clusters.put(clusterInfo.getName().toLowerCase(), clusterInfo); - } - } - } catch (RestClientException e) { - logger.warn("[refresh] rest fail, {}", e.getMessage()); - } catch (Throwable th) { - logger.warn("[refresh] fail", th); - } - - return clusters; - } - - public ClusterInfo buildClusterModel(ClusterMeta clusterMeta) { - ClusterInfo cluster = clusterMetaToClusterModel(clusterMeta); - List groupModels = new ArrayList<>(); - for (GroupMeta group : clusterMeta.getGroups().values()) { - List instanceModels = new ArrayList<>(); - GroupInfo groupModel = groupMetaToGroupModel(group); - for (RedisMeta instance : group.getRedises()) { - instanceModels.add(redisMetaToInstanceModel(instance)); - } - groupModel.setInstances(instanceModels); - groupModels.add(groupModel); - } - cluster.setGroups(groupModels); - return cluster; - } - - public ClusterInfo clusterMetaToClusterModel(ClusterMeta clusterMeta) { - ClusterInfo cluster = new ClusterInfo(); - cluster.setName(clusterMeta.getName()); - if (clusterMeta.getClusterType().equals(ClusterType.XPIPE_ONE_WAY)) { - cluster.setIsXpipe(true); - cluster.setMasterIDC(clusterMeta.getActiveIDC()); - } - return cluster; - } - - public GroupInfo groupMetaToGroupModel(GroupMeta groupMeta) { - GroupInfo groupModel = new GroupInfo(); - groupModel.setName(groupMeta.getGroupName()); - return groupModel; - } - - public InstanceInfo redisMetaToInstanceModel(RedisMeta redisMeta) { - InstanceInfo instance = new InstanceInfo(); - instance.setIPAddress(redisMeta.getHost()); - instance.setPort(redisMeta.getPort()); - instance.setIsMaster(redisMeta.isMaster()); - instance.setStatus(InstanceStatus.ACTIVE.equals(redisMeta.getStatus())); - instance.setEnv(redisMeta.getIdc()); - instance.setCanRead(!InstanceStatus.INACTIVE.equals(redisMeta.getStatus())); - return instance; - } - @Override protected void doInitialize() throws Exception { super.doInitialize(); @@ -159,22 +82,18 @@ protected void doInitialize() throws Exception { XpipeThreadFactory.create("OuterClientCacheRefreshScheduled")); this.refreshTask = new DynamicDelayPeriodTask("OuterClientCacheRefresh", clustersCache::refresh, config::getRedisConfCheckIntervalMilli, scheduled); - this.refreshCurrentDcTask = new DynamicDelayPeriodTask("OuterClientCurrentDcCacheRefresh", currentDcClustersCache::refresh, - config::getRedisConfCheckIntervalMilli, scheduled); } @Override protected void doStart() throws Exception { super.doStart(); this.refreshTask.start(); - this.refreshCurrentDcTask.start(); } @Override protected void doStop() throws Exception { super.doStop(); this.refreshTask.stop(); - this.refreshCurrentDcTask.stop(); } @Override @@ -183,7 +102,6 @@ protected void doDispose() throws Exception { this.scheduled.shutdown(); this.scheduled = null; this.refreshTask = null; - this.refreshCurrentDcTask = null; } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java index 3130905f1..daac3caa7 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java @@ -43,7 +43,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; -import java.util.List; import java.util.concurrent.ExecutorService; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; @@ -176,8 +175,8 @@ public CheckerCrossMasterDelayManager checkerCrossMasterDelayManager(FoundationS } @Bean - public RemoteCheckerManager remoteCheckerManager(CheckerConfig checkerConfig, GroupCheckerLeaderElector checkerLeaderElector, MetaCache metaCache) { - return new DefaultRemoteCheckerManager(checkerConfig, checkerLeaderElector, metaCache); + public RemoteCheckerManager remoteCheckerManager(CheckerConfig checkerConfig, GroupCheckerLeaderElector checkerLeaderElector) { + return new DefaultRemoteCheckerManager(checkerConfig, checkerLeaderElector); } @Bean @@ -191,9 +190,9 @@ public SentinelManager sentinelManager() { public HealthCheckReporter healthCheckReporter(CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, GroupCheckerLeaderElector clusterServer, AllCheckerLeaderElector allCheckerLeaderElector, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, - ClusterHealthManager clusterHealthManager, List healthStateServices, + ClusterHealthManager clusterHealthManager, HealthStateService healthStateService, @Value("${server.port}") int serverPort) { - return new HealthCheckReporter(healthStateServices, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, + return new HealthCheckReporter(healthStateService, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, crossMasterDelayManager, pingService, clusterHealthManager, serverPort); } diff --git a/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html b/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html index 9fa21087b..caffec215 100644 --- a/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html +++ b/redis/redis-console/src/main/resources/static/views/index/full_link_health_check.html @@ -146,7 +146,7 @@ Checker Groups - Show Actions(err) + Show Actions 刷新 @@ -156,8 +156,7 @@

- {{checker.idc}} - {{checker.host}}:{{checker.port}} + {{checker.host}}:{{checker.port}} {{checker.checkerRole}}
diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/checker/DefaultConsoleDcCheckerServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/checker/DefaultConsoleDcCheckerServiceTest.java index ebed647e2..e18db8c64 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/checker/DefaultConsoleDcCheckerServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/checker/DefaultConsoleDcCheckerServiceTest.java @@ -74,8 +74,8 @@ public void testGetLocalDcShardAllCheckerGroupHealthCheck() throws Exception { Mockito.when(metaCache.getRedisOfDcClusterShard(DC, CLUSTER, SHARD)).thenReturn(redisMetas); CommandFuture future1 = Mockito.mock(CommandFuture.class); CommandFuture future2 = Mockito.mock(CommandFuture.class); - Mockito.when(consoleCheckerGroupService.getAllHealthCheckInstance(clusterDbId, IP, PORT, false)).thenReturn(future1); - Mockito.when(consoleCheckerGroupService.getAllHealthStates(clusterDbId, IP, PORT, false)).thenReturn(future2); + Mockito.when(consoleCheckerGroupService.getAllHealthCheckInstance(clusterDbId, IP, PORT)).thenReturn(future1); + Mockito.when(consoleCheckerGroupService.getAllHealthStates(clusterDbId, IP, PORT)).thenReturn(future2); HostPort checker = new HostPort("127.0.0.2", 8080); Map checkerActionMap = new HashMap<>(); Map checkerHealthStateMap = new HashMap<>();; diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/console/ConsoleCheckerPath.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/console/ConsoleCheckerPath.java index 4ea9e6aad..16f01e071 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/console/ConsoleCheckerPath.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/console/ConsoleCheckerPath.java @@ -52,8 +52,6 @@ private ConsoleCheckerPath() {} public static final String PATH_GET_ALL_CURRENT_DC_ACTIVE_DC_ONE_WAY_CLUSTERS = "/api/outclient/clusters/one_way"; - public static final String PATH_GET_ALL_CURRENT_DC_ONE_WAY_CLUSTERS = "/api/outclient/current/dc/clusters/one_way"; - public static final String PATH_PUT_CHECKER_LEADER_MERGE_ALERT = "/api/mail/{alertType}"; } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java index 823b433ee..8d427e185 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/AbstractSubscribe.java @@ -204,12 +204,8 @@ protected void handleMessage(Object response) { if(!(response instanceof Object[])) { throw new RedisRuntimeException(String.format("Subscribe subscribeChannel response incorrect: %s", response)); } - SubscribeMessageHandler handler; - if (this.getName().equals(PSUBSCRIBE)) { - handler = getPSubscribeMessageHandler(); - } else { - handler = getSubscribeMessageHandler(); - } + + SubscribeMessageHandler handler = getSubscribeMessageHandler(); Pair channelAndMessage = handler.handle(payloadToStringArray(response)); if(channelAndMessage != null) { @@ -221,10 +217,6 @@ protected SubscribeMessageHandler getSubscribeMessageHandler() { return new DefaultSubscribeMessageHandler(); } - protected SubscribeMessageHandler getPSubscribeMessageHandler() { - return new PsubscribeMessageHandler(); - } - private void notifyListeners(Pair channelAndMessage) { for(SubscribeListener listener : listeners) { try { diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java deleted file mode 100644 index f47a7e705..000000000 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.ctrip.xpipe.redis.core.protocal.cmd.pubsub; - -import com.ctrip.xpipe.api.pool.SimpleObjectPool; -import com.ctrip.xpipe.netty.commands.NettyClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ScheduledExecutorService; - -public class PsubscribeCommand extends AbstractSubscribe{ - - private static final Logger logger = LoggerFactory.getLogger(PsubscribeCommand.class); - - protected PsubscribeCommand(String host, int port, ScheduledExecutorService scheduled, MESSAGE_TYPE messageType, String... subscribeChannel) { - super(host, port, scheduled, messageType, subscribeChannel); - } - - public PsubscribeCommand(SimpleObjectPool clientPool, ScheduledExecutorService scheduled, int commandTimeoutMilli, String... channel) { - super(clientPool, scheduled, commandTimeoutMilli, MESSAGE_TYPE.PMESSAGE, channel); - } - - - @Override - public String getName() { - return PSUBSCRIBE; - } - - @Override - protected Logger getLogger() { - return logger; - } - -} diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java index b4fcda175..e2109c834 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/SubscribeCommand.java @@ -38,7 +38,7 @@ public SubscribeCommand(SimpleObjectPool clientPool, ScheduledExecu @Override public String getName() { - return SUBSCRIBE; + return "subscribe"; } @Override diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java index f18fc975f..6ef5ff7f6 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java @@ -44,7 +44,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; -import java.util.List; import java.util.concurrent.ExecutorService; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; @@ -170,8 +169,8 @@ public CheckerCrossMasterDelayManager checkerCrossMasterDelayManager(FoundationS } @Bean - public RemoteCheckerManager remoteCheckerManager(CheckerConfig checkerConfig,GroupCheckerLeaderElector clusterServer, MetaCache metaCache) { - return new DefaultRemoteCheckerManager(checkerConfig, clusterServer, metaCache); + public RemoteCheckerManager remoteCheckerManager(CheckerConfig checkerConfig,GroupCheckerLeaderElector clusterServer) { + return new DefaultRemoteCheckerManager(checkerConfig, clusterServer); } @Bean @@ -185,9 +184,9 @@ public SentinelManager sentinelManager() { public HealthCheckReporter healthCheckReporter(CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, GroupCheckerLeaderElector clusterServer, AllCheckerLeaderElector allCheckerLeaderElector, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, - ClusterHealthManager clusterHealthManager, List healthStateServices, + ClusterHealthManager clusterHealthManager, HealthStateService healthStateService, @Value("${server.port}") int serverPort) { - return new HealthCheckReporter(healthStateServices, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, + return new HealthCheckReporter(healthStateService, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, crossMasterDelayManager, pingService, clusterHealthManager, serverPort); }