diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java index 2e4593862..a1da342a7 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/controller/CheckerHealthController.java @@ -97,6 +97,16 @@ public String getHealthCheckRedisInstanceForAssignedAction(@PathVariable String return Codec.DEFAULT.encode(model); } + @RequestMapping(value = "/health/check/redis-for-ping-action/{ip}/{port}", method = RequestMethod.GET) + public String getHealthCheckRedisInstanceForPingAction(@PathVariable String ip, @PathVariable int port) { + RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForPingAction(new HostPort(ip, port)); + if(instance == null) { + return "Not found"; + } + HealthCheckInstanceModel model = buildHealthCheckInfo(instance); + return Codec.DEFAULT.encode(model); + } + @RequestMapping(value = "/health/redis/info/{ip}/{port}", method = RequestMethod.GET) public ActionContextRetMessage> getRedisInfo(@PathVariable String ip, @PathVariable int port) { return ActionContextRetMessage.from(redisInfoManager.getInfoByHostPort(new HostPort(ip, port))); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java index 30f1e0aa8..220bbd4f1 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/HealthCheckInstanceManager.java @@ -18,6 +18,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisMeta redis); + RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis); + KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper); ClusterHealthCheckInstance getOrCreate(ClusterMeta cluster); @@ -26,6 +28,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort); + RedisHealthCheckInstance findRedisInstanceForPingAction(HostPort hostPort); + KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort); ClusterHealthCheckInstance findClusterHealthCheckInstance(String clusterId); @@ -36,6 +40,8 @@ public interface HealthCheckInstanceManager { RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort); + RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort); + ClusterHealthCheckInstance remove(String cluster); List getAllRedisInstance(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java new file mode 100644 index 000000000..33ba4ea6f --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AbstractPsubPingActionCollector.java @@ -0,0 +1,94 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector; +import com.google.common.collect.Maps; + +import java.util.Map; + +public abstract class AbstractPsubPingActionCollector implements PsubPingActionCollector { + + protected Map allHealthStatus = Maps.newConcurrentMap(); + + protected PingActionListener pingActionListener = new AbstractPsubPingActionCollector.CollectorPingActionListener(); + + protected PsubActionListener psubActionListener = new AbstractPsubPingActionCollector.CollectorPsubActionListener(); + + protected abstract HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance); + + protected void removeHealthStatus(HealthCheckAction action) { + HealthStatus healthStatus = allHealthStatus.remove(action.getActionInstance()); + if(healthStatus != null) { + healthStatus.stop(); + } + } + + @Override + public boolean supportInstance(RedisHealthCheckInstance instance) { + return true; + } + + @Override + public PingActionListener createPingActionListener() { + return pingActionListener; + } + + @Override + public PsubActionListener createPsubActionListener() { + return psubActionListener; + } + + protected class CollectorPingActionListener implements PingActionListener { + + @Override + public void onAction(PingActionContext pingActionContext) { + HealthStatus healthStatus = createOrGetHealthStatus(pingActionContext.instance()); + if (!pingActionContext.isSuccess()) { + if (pingActionContext.getCause().getMessage().contains("LOADING")) { + healthStatus.loading(); + } + return; + } + + if (pingActionContext.getResult()) { + healthStatus.pong(); + } else { + if(healthStatus.getState() == HEALTH_STATE.UNKNOWN) { + healthStatus.pongInit(); + } + } + } + + @Override + public boolean worksfor(ActionContext t) { + return t instanceof PingActionContext; + } + + @Override + public void stopWatch(HealthCheckAction action) { + removeHealthStatus(action); + } + } + + protected class CollectorPsubActionListener implements PsubActionListener { + + @Override + public void onAction(PsubActionContext psubActionContext) { + HealthStatus healthStatus = createOrGetHealthStatus(psubActionContext.instance()); + if (!psubActionContext.getResult().isEmpty()) { + healthStatus.subSuccess(); + } + } + + @Override + public void stopWatch(HealthCheckAction action) { + removeHealthStatus(action); + } + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java new file mode 100644 index 000000000..e614cd190 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/CrossRegionRedisHealthStatus.java @@ -0,0 +1,91 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceDown; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceUp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * UNKNOWN + * pingSuccess -> INSTANCEUP + start subAction + * pingFail -> DOWN + markDown + * subSuccess -> throw exception + *

+ * INSTANCEUP + * pingSuccess,do nothing + * pingFail -> DOWN + markDown + * subSuccess -> HEALTHY + markUp + stop subAction + *

+ * HEALTHY + * pingSuccess,do nothing + * pingFail -> DOWN + markDown + * subSuccess -> throw exception + *

+ * DOWN + * pingSuccess -> INSTANCEUP + start subAction + * pingFail,do nothing + * subSuccess -> throw exception + */ +public class CrossRegionRedisHealthStatus extends HealthStatus { + + protected static final Logger logger = LoggerFactory.getLogger(CrossRegionRedisHealthStatus.class); + + public CrossRegionRedisHealthStatus(RedisHealthCheckInstance instance, ScheduledExecutorService scheduled) { + super(instance, scheduled); + } + + @Override + protected void loading() { + doMarkDown(); + } + + @Override + protected void pong() { + HEALTH_STATE preState = state.get(); + if (preState.equals(HEALTH_STATE.UNKNOWN) || preState.equals(HEALTH_STATE.DOWN)) { + if(state.compareAndSet(preState, HEALTH_STATE.INSTANCEUP)) { + logStateChange(preState, state.get()); + } + } + } + + @Override + protected void subSuccess() { + HEALTH_STATE preState = state.get(); + if (preState.equals(HEALTH_STATE.INSTANCEUP)) { + if(state.compareAndSet(preState, HEALTH_STATE.HEALTHY)) { + logStateChange(preState, state.get()); + } + logger.info("[setUp] {}", this); + notifyObservers(new InstanceUp(instance)); + } + } + + @Override + protected void healthStatusUpdate() { + long currentTime = System.currentTimeMillis(); + + if(lastPongTime.get() != UNSET_TIME) { + long pingDownTime = currentTime - lastPongTime.get(); + final int pingDownAfter = pingDownAfterMilli.getAsInt(); + if (pingDownTime > pingDownAfter) { + doMarkDown(); + } + } + } + + protected void doMarkDown() { + HEALTH_STATE preState = state.get(); + if(state.compareAndSet(preState, HEALTH_STATE.DOWN)) { + logStateChange(preState, state.get()); + } + if (!preState.equals(HEALTH_STATE.DOWN)) { + logger.info("[setDown] {}", this); + notifyObservers(new InstanceDown(instance)); + } + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java new file mode 100644 index 000000000..efbb73d75 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultPsubPingActionCollector.java @@ -0,0 +1,106 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; + +import com.ctrip.xpipe.api.factory.ObjectFactory; +import com.ctrip.xpipe.api.observer.Observable; +import com.ctrip.xpipe.api.observer.Observer; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.AbstractInstanceEvent; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.HealthEventProcessor; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector; +import com.ctrip.xpipe.utils.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; +import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; + +@Component +public class DefaultPsubPingActionCollector extends AbstractPsubPingActionCollector implements PsubPingActionCollector, HealthStateService, OneWaySupport { + + private static final Logger logger = LoggerFactory.getLogger(DefaultPsubPingActionCollector.class); + + @Autowired + private List healthEventProcessors; + + @Resource(name = SCHEDULED_EXECUTOR) + private ScheduledExecutorService scheduled; + + @Resource(name = GLOBAL_EXECUTOR) + private ExecutorService executors; + + @Override + public HEALTH_STATE getHealthState(HostPort hostPort) { + RedisHealthCheckInstance key = allHealthStatus.keySet().stream() + .filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort)) + .findFirst().orElse(null); + + if (null != key) return allHealthStatus.get(key).getState(); + return null; + } + + @Override + public Map getAllCachedState() { + Map cachedHealthStatus = new HashMap<>(); + allHealthStatus.forEach(((instance, healthStatus) -> { + RedisInstanceInfo info = instance.getCheckInfo(); + cachedHealthStatus.put(info.getHostPort(), healthStatus.getState()); + })); + + return cachedHealthStatus; + } + + @Override + public void updateHealthState(Map redisStates) { + throw new UnsupportedOperationException(); + } + + @Override + protected HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance) { + return MapUtils.getOrCreate(allHealthStatus, instance, new ObjectFactory() { + @Override + public HealthStatus create() { + + HealthStatus healthStatus = new CrossRegionRedisHealthStatus(instance, scheduled); + + healthStatus.addObserver(new Observer() { + @Override + public void update(Object args, Observable observable) { + onInstanceStateChange(args); + } + }); + healthStatus.start(); + return healthStatus; + } + }); + } + + private void onInstanceStateChange(Object args) { + + logger.info("[onInstanceStateChange]{}", args); + for (HealthEventProcessor processor : healthEventProcessors) { + + if (processor instanceof OneWaySupport) { + executors.execute(new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + processor.onEvent((AbstractInstanceEvent) args); + } + }); + } + } + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java index 7ad76d41b..be0421bfa 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/HealthStatus.java @@ -31,10 +31,10 @@ public class HealthStatus extends AbstractObservable implements Startable, Stopp public static long UNSET_TIME = -1L; - private AtomicLong lastPongTime = new AtomicLong(UNSET_TIME); + protected AtomicLong lastPongTime = new AtomicLong(UNSET_TIME); private AtomicLong lastHealthDelayTime = new AtomicLong(UNSET_TIME); - private AtomicReference state = new AtomicReference<>(HEALTH_STATE.UNKNOWN); + protected AtomicReference state = new AtomicReference<>(HEALTH_STATE.UNKNOWN); protected RedisHealthCheckInstance instance; protected final IntSupplier delayDownAfterMilli; @@ -110,7 +110,7 @@ protected boolean shouldNotRun() { return lastHealthDelayTime.get() < 0 && lastPongTime.get() < 0; } - void loading() { + protected void loading() { HEALTH_STATE preState = state.get(); if(preState.equals(preState.afterPingFail())) { return; @@ -124,7 +124,7 @@ void loading() { } } - void pong(){ + protected void pong(){ lastPongTime.set(System.currentTimeMillis()); setPingUp(); } @@ -135,6 +135,8 @@ void pongInit() { } } + protected void subSuccess(){} + void delay(long delayMilli, long...srcShardDbId){ //first time diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCurrentDcHealthStatusConsistenceInspector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCurrentDcHealthStatusConsistenceInspector.java new file mode 100644 index 000000000..cecb37ea8 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceCurrentDcHealthStatusConsistenceInspector.java @@ -0,0 +1,229 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.api.lifecycle.Ordered; +import com.ctrip.xpipe.api.lifecycle.TopElement; +import com.ctrip.xpipe.api.monitor.Task; +import com.ctrip.xpipe.api.monitor.TransactionMonitor; +import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.endpoint.HostPort; +import com.ctrip.xpipe.lifecycle.AbstractLifecycle; +import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.OutClientInstanceHealthHolder; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.UpDownInstances; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; +import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder; +import com.ctrip.xpipe.redis.core.entity.ClusterMeta; +import com.ctrip.xpipe.redis.core.entity.DcMeta; +import com.ctrip.xpipe.redis.core.entity.ShardMeta; +import com.ctrip.xpipe.redis.core.entity.XpipeMeta; +import com.ctrip.xpipe.redis.core.exception.MasterNotFoundException; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import com.ctrip.xpipe.tuple.Pair; +import com.ctrip.xpipe.utils.MapUtils; +import com.ctrip.xpipe.utils.XpipeThreadFactory; +import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.lang.Nullable; +import org.springframework.stereotype.Service; + +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; + +@Service +public class InstanceCurrentDcHealthStatusConsistenceInspector extends AbstractLifecycle implements TopElement { + private InstanceHealthStatusCollector collector; + + private InstanceStatusAdjuster adjuster; + + private StabilityHolder siteStability; + + private CheckerConfig config; + + private MetaCache metaCache; + + private GroupCheckerLeaderElector leaderElector; + + private DynamicDelayPeriodTask task; + + private ScheduledExecutorService scheduled; + + private static final Logger logger = LoggerFactory.getLogger(InstanceCurrentDcHealthStatusConsistenceInspector.class); + + private static final String currentDc = FoundationService.DEFAULT.getDataCenter(); + + private static final String TYPE = "HealthCheck"; + + @Autowired + public InstanceCurrentDcHealthStatusConsistenceInspector(InstanceHealthStatusCollector instanceHealthStatusCollector, + InstanceStatusAdjuster instanceStatusAdjuster, + @Nullable GroupCheckerLeaderElector groupCheckerLeaderElector, + StabilityHolder stabilityHolder, CheckerConfig checkerConfig, + MetaCache metaCache) { + this.collector = instanceHealthStatusCollector; + this.adjuster = instanceStatusAdjuster; + this.leaderElector = groupCheckerLeaderElector; + this.siteStability = stabilityHolder; + this.config = checkerConfig; + this.metaCache = metaCache; + } + + protected void inspectCurrentDc() { + if(!siteStability.isSiteStable()) { + logger.debug("[inspectCurrentDc][skip] unstable"); + return; + } + if (null == leaderElector || !leaderElector.amILeader()) { + logger.debug("[inspectCurrentDc][skip] not leader"); + return; + } + + logger.debug("[inspectCurrentDc] begin"); + final long timeoutMill = System.currentTimeMillis() + Math.min(config.getPingDownAfterMilli() / 2, + config.getDownAfterCheckNums() * config.getRedisReplicationHealthCheckInterval() / 2); + TransactionMonitor.DEFAULT.logTransactionSwallowException(TYPE, "compensator.inspect.currentDc", new Task() { + @Override + public void go() throws Exception { + Map> interestedCurrentDc = fetchInterestedCurrentDcClusterInstances(); + if (interestedCurrentDc.isEmpty()) { + logger.debug("[inspectCurrentDc][skip] no interested instance"); + return; + } + + Pair instanceHealth = collector.collect(); + checkTimeout(timeoutMill, "after collect"); + + XPipeInstanceHealthHolder xpipeInstanceHealth = instanceHealth.getKey(); + OutClientInstanceHealthHolder outClientInstanceHealth = instanceHealth.getValue(); + UpDownInstances hostPortNeedAdjustForPingAction = findHostPortNeedAdjust(xpipeInstanceHealth, outClientInstanceHealth, interestedCurrentDc); + + checkTimeout(timeoutMill, "after compare"); + if (!hostPortNeedAdjustForPingAction.getHealthyInstances().isEmpty()) + adjuster.adjustInstances(hostPortNeedAdjustForPingAction.getHealthyInstances(), true, timeoutMill); + + checkTimeout(timeoutMill, "after adjust up"); + if (!hostPortNeedAdjustForPingAction.getUnhealthyInstances().isEmpty()) + adjuster.adjustInstances(hostPortNeedAdjustForPingAction.getUnhealthyInstances(), false, timeoutMill); + } + + @Override + public Map getData() { + return Collections.singletonMap("timeoutMilli", timeoutMill); + } + }); + } + + private void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { + if (System.currentTimeMillis() > timeoutAtMilli) { + logger.info("[timeout] {}", msg); + throw new TimeoutException(msg); + } + } + + protected Map> fetchInterestedCurrentDcClusterInstances() { + Map> interestedCurrentDcClusterInstances = new HashMap<>(); + XpipeMeta xpipeMeta = metaCache.getXpipeMeta(); + if (null == xpipeMeta) return interestedCurrentDcClusterInstances; + + for (DcMeta dcMeta: xpipeMeta.getDcs().values()) { + if (!dcMeta.getId().equalsIgnoreCase(currentDc)) continue; + + for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { + if (!ClusterType.isSameClusterType(clusterMeta.getType(), ClusterType.ONE_WAY)) continue; + if (!metaCache.isCrossRegion(dcMeta.getId(), clusterMeta.getActiveDc())) continue; + + Set interestedInstances = MapUtils.getOrCreate(interestedCurrentDcClusterInstances, clusterMeta.getId(), HashSet::new); + for (ShardMeta shardMeta: clusterMeta.getShards().values()) { + shardMeta.getRedises().forEach(redis -> interestedInstances.add(new HostPort(redis.getIp(), redis.getPort()))); + } + } + } + + return interestedCurrentDcClusterInstances; + } + + protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipeInstanceHealthHolder, + OutClientInstanceHealthHolder outClientInstanceHealthHolder, + Map> interested) { + int quorum = config.getQuorum(); + UpDownInstances xpipeInstances = xpipeInstanceHealthHolder.aggregate(interested, quorum); + UpDownInstances outClientInstances = outClientInstanceHealthHolder.extractReadable(interested); + + Set needMarkUpInstances = xpipeInstances.getHealthyInstances(); + Set needMarkDownInstances = xpipeInstances.getUnhealthyInstances(); + needMarkUpInstances.retainAll(outClientInstances.getUnhealthyInstances()); + needMarkDownInstances.retainAll(outClientInstances.getHealthyInstances()); + needMarkDownInstances = filterMasterHealthyInstances(xpipeInstanceHealthHolder, needMarkDownInstances, quorum); + return new UpDownInstances(needMarkUpInstances, needMarkDownInstances); + } + + protected Set filterMasterHealthyInstances(XPipeInstanceHealthHolder xpipeInstanceHealthHolder, + Set instances, int quorum) { + Map, Boolean> masterHealthStatusMap = new HashMap<>(); + Set masterHealthyInstances = new HashSet<>(); + for (HostPort instance: instances) { + Pair clusterShard = metaCache.findClusterShard(instance); + if (null == clusterShard) continue; + + if (!masterHealthStatusMap.containsKey(clusterShard)) { + try { + HostPort master = metaCache.findMaster(clusterShard.getKey(), clusterShard.getValue()); + Boolean healthy = xpipeInstanceHealthHolder.aggregate(master, quorum); + masterHealthStatusMap.put(clusterShard, healthy); + } catch (MasterNotFoundException e) { + masterHealthStatusMap.put(clusterShard, null); + } + } + + if (Boolean.TRUE.equals(masterHealthStatusMap.get(clusterShard))) { + masterHealthyInstances.add(instance); + } + } + + + return masterHealthyInstances; + } + + @Override + protected void doInitialize() throws Exception { + super.doInitialize(); + this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("InstanceCurrentDcHealthStatusConsistenceInspector")); + this.task = new DynamicDelayPeriodTask("inspectCurrentDc", new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + inspectCurrentDc(); + } + }, config::getHealthMarkCompensateIntervalMill, scheduled); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + this.task.start(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + this.task.stop(); + } + + @Override + protected void doDispose() throws Exception { + super.doDispose(); + this.scheduled.shutdown(); + this.scheduled = null; + this.task = null; + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java index 1dfa71b06..92cc5a50a 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspector.java @@ -49,17 +49,17 @@ @Service public class InstanceHealthStatusConsistenceInspector extends AbstractLifecycle implements TopElement { - private InstanceHealthStatusCollector collector; + protected InstanceHealthStatusCollector collector; - private InstanceStatusAdjuster adjuster; + protected InstanceStatusAdjuster adjuster; - private StabilityHolder siteStability; + protected StabilityHolder siteStability; - private CheckerConfig config; + protected CheckerConfig config; - private MetaCache metaCache; + protected MetaCache metaCache; - private GroupCheckerLeaderElector leaderElector; + protected GroupCheckerLeaderElector leaderElector; private DynamicDelayPeriodTask task; @@ -138,7 +138,7 @@ public Map getData() { }); } - private void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { + protected void checkTimeout(long timeoutAtMilli, String msg) throws TimeoutException { if (System.currentTimeMillis() > timeoutAtMilli) { logger.info("[timeout] {}", msg); throw new TimeoutException(msg); @@ -157,6 +157,7 @@ protected Map> fetchInterestedClusterInstances() { for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) { if (!ClusterType.isSameClusterType(clusterMeta.getType(), ClusterType.ONE_WAY)) continue; + if (metaCache.isCrossRegion(currentDc, clusterMeta.getActiveDc())) continue;; if (!clusterMeta.getActiveDc().equalsIgnoreCase(currentDc)) continue; Set interestedInstances = MapUtils.getOrCreate(interestedClusterInstances, clusterMeta.getId(), HashSet::new); @@ -181,7 +182,6 @@ protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipe needMarkUpInstances.retainAll(outClientInstances.getUnhealthyInstances()); needMarkDownInstances.retainAll(outClientInstances.getHealthyInstances()); needMarkDownInstances = filterMasterHealthyInstances(xpipeInstanceHealthHolder, needMarkDownInstances, quorum); - needMarkDownInstances = filterMarkDowUnsupportedInstances(needMarkDownInstances); return new UpDownInstances(needMarkUpInstances, needMarkDownInstances); } @@ -212,33 +212,6 @@ protected Set filterMasterHealthyInstances(XPipeInstanceHealthHolder x return masterHealthyInstances; } - protected Set filterMarkDowUnsupportedInstances(Set instances) { - Set wontMarkdownInstances = new HashSet<>(); - for (HostPort instance : instances) { - HEALTH_STATE healthState = defaultDelayPingActionCollector.getState(instance); - if (healthState.equals(HEALTH_STATE.SICK)) { - if (!shouldMarkdownDcClusterSickInstances(healthCheckInstanceManager.findRedisHealthCheckInstance(instance))) - wontMarkdownInstances.add(instance); - } - } - instances.removeAll(wontMarkdownInstances); - return instances; - } - - boolean shouldMarkdownDcClusterSickInstances(RedisHealthCheckInstance healthCheckInstance) { - RedisInstanceInfo info = healthCheckInstance.getCheckInfo(); - if (info.isCrossRegion()) { - logger.info("[markdown][{} is cross region, do not call client service ]", info.getHostPort()); - return false; - } - if (healthCheckInstance.getHealthCheckConfig().getDelayConfig(info.getClusterId(), currentDc, info.getDcId()).getClusterLevelHealthyDelayMilli() < 0) { - logger.info("[markdown][cluster {} dcs {}->{} distance is -1, do not call client service ]", info.getClusterId(), currentDc, info.getDcId()); - return false; - } - return true; - } - - @Override protected void doInitialize() throws Exception { super.doInitialize(); diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java index 0d6942e4c..54520ad8d 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.handler; +import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.concurrent.FinalStateSetterManager; import com.ctrip.xpipe.endpoint.ClusterShardHostPort; import com.ctrip.xpipe.endpoint.HostPort; @@ -48,6 +49,8 @@ public abstract class AbstractHealthEventHandler delayPingCollectors; + @Autowired + private List psubPingActionCollectors; + private Map> controllersByClusterType; private Map> listenerByClusterType; private Map> delayPingCollectorsByClusterType; + private Map> psubPingActionCollectorsByClusterType; + @PostConstruct public void postConstruct() { controllersByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(controllers); listenerByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(listeners); delayPingCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(delayPingCollectors); + psubPingActionCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(psubPingActionCollectors); } @Override @@ -73,6 +80,10 @@ public PingAction create(RedisHealthCheckInstance instance) { if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); }); + psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) pingAction.addListener(collector.createPingActionListener()); + }); + return pingAction; } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java new file mode 100644 index 000000000..e5d32397a --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/OneWayPsubActionController.java @@ -0,0 +1,24 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; +import com.ctrip.xpipe.redis.core.meta.MetaCache; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class OneWayPsubActionController implements PsubActionController, OneWaySupport { + + @Autowired + private MetaCache metaCache; + + private static final String currentDcId = FoundationService.DEFAULT.getDataCenter(); + + @Override + public boolean shouldCheck(RedisHealthCheckInstance instance) { + RedisInstanceInfo info = instance.getCheckInfo(); + return metaCache.isCrossRegion(currentDcId, info.getActiveDc()) && currentDcId.equalsIgnoreCase(info.getDcId()); + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java new file mode 100644 index 000000000..eec84424f --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubAction.java @@ -0,0 +1,50 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.AbstractHealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +public class PsubAction extends AbstractHealthCheckAction { + + protected static final Logger logger = LoggerFactory.getLogger(PsubAction.class); + + private String[] pubSubChannelPrefix; + + public PsubAction(ScheduledExecutorService scheduled, RedisHealthCheckInstance instance, ExecutorService executors) { + super(scheduled, instance, executors); + this.pubSubChannelPrefix = new String[]{"xpipe"}; + } + + @Override + protected void doTask() { + RedisSession session = instance.getRedisSession(); + doPSubscribe(session, new RedisSession.SubscribeCallback() { + @Override + public void message(String channel, String message) { + logger.info("[PsubAction] success, message : {}", message); + notifyListeners(new PsubActionContext(instance, message)); + } + + @Override + public void fail(Throwable e) { + logger.info("[PsubAction] fail"); + //ignore psub fail + } + }, pubSubChannelPrefix); + } + + @Override + protected Logger getHealthCheckLogger() { + return logger; + } + + protected void doPSubscribe(RedisSession session, RedisSession.SubscribeCallback callback, String... channel) { + session.psubscribeIfAbsent(callback, channel); + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java new file mode 100644 index 000000000..017aa6d4d --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionContext.java @@ -0,0 +1,10 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.AbstractActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public class PsubActionContext extends AbstractActionContext { + public PsubActionContext(RedisHealthCheckInstance instance, String s) { + super(instance, s); + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java new file mode 100644 index 000000000..c5e38e723 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionController.java @@ -0,0 +1,7 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckActionController; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public interface PsubActionController extends HealthCheckActionController { +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java new file mode 100644 index 000000000..b6b4f913c --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionFactory.java @@ -0,0 +1,57 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.cluster.ClusterType; +import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckActionFactory; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.util.ClusterTypeSupporterSeparator; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static com.ctrip.xpipe.redis.checker.resource.Resource.PING_DELAY_INFO_EXECUTORS; +import static com.ctrip.xpipe.redis.checker.resource.Resource.PING_DELAY_INFO_SCHEDULED; + +@Component +public class PsubActionFactory implements RedisHealthCheckActionFactory, OneWaySupport { + + @Resource(name = PING_DELAY_INFO_SCHEDULED) + private ScheduledExecutorService scheduled; + + @Resource(name = PING_DELAY_INFO_EXECUTORS) + private ExecutorService executors; + + @Autowired + private List controllers; + + @Autowired + private List psubPingActionCollectors; + + private Map> controllersByClusterType; + + private Map> psubPingActionCollectorsByClusterType; + + @PostConstruct + public void postConstruct() { + controllersByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(controllers); + psubPingActionCollectorsByClusterType = ClusterTypeSupporterSeparator.divideByClusterType(psubPingActionCollectors); + } + + @Override + public PsubAction create(RedisHealthCheckInstance instance) { + PsubAction psubAction = new PsubAction(scheduled, instance, executors); + ClusterType clusterType = instance.getCheckInfo().getClusterType(); + + psubAction.addControllers(controllersByClusterType.get(clusterType)); + psubPingActionCollectorsByClusterType.get(clusterType).forEach(collector -> { + if (collector.supportInstance(instance)) psubAction.addListener(collector.createPsubActionListener()); + }); + return psubAction; + } +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java new file mode 100644 index 000000000..59a19d494 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubActionListener.java @@ -0,0 +1,15 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction; +import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckActionListener; +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; + +public interface PsubActionListener extends HealthCheckActionListener> { + + @Override + default boolean worksfor(ActionContext t) { + return t instanceof PsubActionContext; + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java new file mode 100644 index 000000000..17cf8a6e6 --- /dev/null +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/psubscribe/PsubPingActionCollector.java @@ -0,0 +1,14 @@ +package com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe; + +import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener; + +public interface PsubPingActionCollector { + + boolean supportInstance(RedisHealthCheckInstance instance); + + PingActionListener createPingActionListener(); + + PsubActionListener createPsubActionListener(); + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java index 9bfe46e78..53b8c2ee2 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/config/CompositeHealthCheckConfig.java @@ -6,6 +6,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.KeeperInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo; import com.ctrip.xpipe.redis.checker.healthcheck.actions.delay.DelayConfig; +import com.ctrip.xpipe.redis.core.meta.MetaCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,9 +21,9 @@ public class CompositeHealthCheckConfig implements HealthCheckConfig { private HealthCheckConfig config; - public CompositeHealthCheckConfig(RedisInstanceInfo instanceInfo, CheckerConfig checkerConfig, DcRelationsService dcRelationsService) { + public CompositeHealthCheckConfig(RedisInstanceInfo instanceInfo, CheckerConfig checkerConfig, DcRelationsService dcRelationsService, boolean isCrossRegion) { logger.info("[CompositeHealthCheckConfig] {}", instanceInfo); - if(instanceInfo.isCrossRegion()) { + if(isCrossRegion) { config = new ProxyEnabledHealthCheckConfig(checkerConfig, dcRelationsService); logger.info("[CompositeHealthCheckConfig][proxied] ping down time: {}", config.pingDownAfterMilli()); } else { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java index 665b0270a..de78bc6ca 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceFactory.java @@ -10,6 +10,8 @@ import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.*; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionFactory; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubAction; import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.RedisCheckRule; import com.ctrip.xpipe.redis.checker.healthcheck.config.CompositeHealthCheckConfig; import com.ctrip.xpipe.redis.checker.healthcheck.config.DefaultHealthCheckConfig; @@ -124,7 +126,7 @@ public RedisHealthCheckInstance create(RedisMeta redisMeta) { RedisInstanceInfo info = createRedisInstanceInfo(redisMeta); Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redisMeta); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); instance.setEndpoint(endpoint) .setSession(redisSessionManager.findOrCreateSession(endpoint)) @@ -228,7 +230,7 @@ public RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta r DefaultRedisHealthCheckInstance instance = new DefaultRedisHealthCheckInstance(); RedisInstanceInfo info = createRedisInstanceInfo(redis); - HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redis); instance.setEndpoint(endpoint) @@ -242,6 +244,25 @@ public RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta r return instance; } + @Override + public RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis) { + DefaultRedisHealthCheckInstance instance = new DefaultRedisHealthCheckInstance(); + + RedisInstanceInfo info = createRedisInstanceInfo(redis); + HealthCheckConfig config = new CompositeHealthCheckConfig(info, checkerConfig, dcRelationsService, metaCache.isCrossRegion(currentDcId, info.getDcId())); + Endpoint endpoint = endpointFactory.getOrCreateEndpoint(redis); + + instance.setEndpoint(endpoint) + .setSession(redisSessionManager.findOrCreateSession(endpoint)) + .setInstanceInfo(info) + .setHealthCheckConfig(config); + + initActionsForRedisForPsubPingAction(instance); + startCheck(instance); + + return instance; + } + private void initActionsForRedisForAssignedAction(DefaultRedisHealthCheckInstance instance) { for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { if (factory instanceof KeeperSupport) @@ -249,6 +270,13 @@ private void initActionsForRedisForAssignedAction(DefaultRedisHealthCheckInstanc } } + private void initActionsForRedisForPsubPingAction(DefaultRedisHealthCheckInstance instance) { + for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { + if (factory instanceof PingActionFactory || factory instanceof PsubAction) + initActions(instance, factory); + } + } + @SuppressWarnings("unchecked") private void initActions(DefaultRedisHealthCheckInstance instance) { for(RedisHealthCheckActionFactory factory : factoriesByClusterType.get(instance.getCheckInfo().getClusterType())) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java index 390ecbc75..14eeefa56 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthCheckInstanceManager.java @@ -35,6 +35,8 @@ public class DefaultHealthCheckInstanceManager implements HealthCheckInstanceMan private ConcurrentMap redisInstanceForAssignedAction = Maps.newConcurrentMap(); + private ConcurrentMap redisInstanceForPingAction = Maps.newConcurrentMap(); + @Autowired private HealthCheckInstanceFactory instanceFactory; @@ -61,6 +63,18 @@ public RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisM return null; } + @Override + public RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis) { + try { + HostPort key = new HostPort(redis.getIp(), redis.getPort()); + return MapUtils.getOrCreate(redisInstanceForPingAction, key, + () -> instanceFactory.getOrCreateRedisInstanceForPsubPingAction(redis)); + } catch (Throwable th) { + logger.error("getOrCreate ping action health check redis instance:{}:{}", redis.getIp(), redis.getPort()); + } + return null; + } + @Override public KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper) { try { @@ -93,6 +107,11 @@ public RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort host return redisInstanceForAssignedAction.get(hostPort); } + @Override + public RedisHealthCheckInstance findRedisInstanceForPingAction(HostPort hostPort) { + return redisInstanceForPingAction.get(hostPort); + } + @Override public KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort) { return keeperInstances.get(hostPort); @@ -125,6 +144,13 @@ public RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort ho return instance; } + @Override + public RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort) { + RedisHealthCheckInstance instance = redisInstanceForPingAction.remove(hostPort); + if (null != instance) instanceFactory.remove(instance); + return instance; + } + @Override public ClusterHealthCheckInstance remove(String cluster) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java index 4f0b23adb..ea8f4a7b3 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/DefaultHealthChecker.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.core.meta.comparator.KeeperContainerMetaComparator.getAllKeeperContainerDetailInfoFromDcMeta; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @@ -146,6 +145,10 @@ void generateHealthCheckInstances() { generateHealthCheckInstances(cluster); } + if (clusterType == ClusterType.ONE_WAY && isClusterActiveDcCrossRegion(cluster) && clusterDcIsCurrentDc(cluster)) { + generatePsubPingActionHealthCheckInstances(cluster); + } + } } } @@ -168,6 +171,14 @@ void generateHealthCheckInstances(ClusterMeta clusterMeta){ instanceManager.getOrCreate(clusterMeta); } + void generatePsubPingActionHealthCheckInstances(ClusterMeta clusterMeta){ + for(ShardMeta shard : clusterMeta.getShards().values()) { + for(RedisMeta redis : shard.getRedises()) { + instanceManager.getOrCreateRedisInstanceForPsubPingAction(redis); + } + } + } + private boolean isClusterActiveIdcCurrentIdc(ClusterMeta cluster) { return cluster.getActiveDc().equalsIgnoreCase(currentDcId); @@ -206,4 +217,8 @@ private boolean hasMultipleActiveDcs(ClusterType clusterType) { return clusterType.supportMultiActiveDC() && !clusterType.isCrossDc(); } + private boolean isClusterActiveDcCrossRegion(ClusterMeta clusterMeta) { + return metaCache.isCrossRegion(currentDcId, clusterMeta.getActiveDc()); + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java index 0b82cb610..4a89b794e 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/HealthCheckInstanceFactory.java @@ -27,4 +27,6 @@ public interface HealthCheckInstanceFactory { void remove(ClusterHealthCheckInstance instance); RedisHealthCheckInstance createRedisInstanceForAssignedAction(RedisMeta redis); + + RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java index 796505132..9853f7c22 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManager.java @@ -9,6 +9,7 @@ import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager; import com.ctrip.xpipe.redis.checker.healthcheck.impl.HealthCheckEndpointFactory; import com.ctrip.xpipe.redis.core.entity.*; +import com.ctrip.xpipe.redis.core.meta.MetaCache; import com.ctrip.xpipe.redis.core.meta.MetaComparator; import com.ctrip.xpipe.redis.core.meta.MetaComparatorVisitor; import com.ctrip.xpipe.redis.core.meta.comparator.ClusterMetaComparator; @@ -48,6 +49,8 @@ public class DefaultDcMetaChangeManager extends AbstractStartStoppable implement private CheckerConfig checkerConfig; + private MetaCache metaCache; + private final String dcId; private final List clustersToDelete = new ArrayList<>(); @@ -58,12 +61,14 @@ public class DefaultDcMetaChangeManager extends AbstractStartStoppable implement public DefaultDcMetaChangeManager(String dcId, HealthCheckInstanceManager instanceManager, HealthCheckEndpointFactory healthCheckEndpointFactory, CheckerConsoleService checkerConsoleService, - CheckerConfig checkerConfig) { + CheckerConfig checkerConfig, + MetaCache metaCache) { this.dcId = dcId; this.instanceManager = instanceManager; this.healthCheckEndpointFactory = healthCheckEndpointFactory; this.checkerConsoleService = checkerConsoleService; this.checkerConfig = checkerConfig; + this.metaCache = metaCache; } @Override @@ -104,10 +109,12 @@ public void compare(DcMeta future, DcMeta allFutureDcMeta) { private void removeAndAdd() { this.redisListToDelete.forEach(this::removeRedis); + this.redisListToDelete.forEach(this::removeRedisOnlyForPingAction); this.clustersToDelete.forEach(this::removeCluster); this.clustersToAdd.forEach(this::addCluster); this.redisListToAdd.forEach(this::addRedis); + this.redisListToAdd.forEach(this::addRedisOnlyForPingAction); } private void clearUp() { @@ -125,19 +132,24 @@ private void removeCluster(ClusterMeta removed) { logger.info("[removeCluster][{}][{}] remove health check", dcId, removed.getId()); ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(removeConsumer))); + ClusterMetaVisitor clusterMetaPingActionVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(removePingActionConsumer))); clusterMetaVisitor.accept(removed); + clusterMetaPingActionVisitor.accept(removed); } private void addCluster(ClusterMeta added) { - if (!isInterestedInCluster(added)) { - logger.info("[addCluster][{}][skip] cluster not interested", added.getId()); - return; + if (isInterestedInCluster(added)) { + logger.info("[addCluster][{}][{}] add health check", dcId, added.getId()); + instanceManager.getOrCreate(added); + ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addConsumer))); + clusterMetaVisitor.accept(added); + } + + if (isOneWayClusterActiveDcCrossRegionAndCurrentDc(added)) { + ClusterMetaVisitor clusterMetaPingActionVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addPingActionConsumer))); + clusterMetaPingActionVisitor.accept(added); } - logger.info("[addCluster][{}][{}] add health check", dcId, added.getId()); - instanceManager.getOrCreate(added); - ClusterMetaVisitor clusterMetaVisitor = new ClusterMetaVisitor(new ShardMetaVisitor(new RedisMetaVisitor(addConsumer))); - clusterMetaVisitor.accept(added); } private void removeRedis(RedisMeta removed) { @@ -199,6 +211,11 @@ protected boolean isInterestedInCluster(ClusterMeta cluster) { return result; } + private boolean isOneWayClusterActiveDcCrossRegionAndCurrentDc(ClusterMeta cluster) { + ClusterType clusterType = ClusterType.lookup(cluster.getType()); + return clusterType == ClusterType.ONE_WAY && isClusterActiveDcCrossRegion(cluster) && clusterDcIsCurrentDc(cluster); + } + private boolean clusterDcIsCurrentDc(ClusterMeta clusterMeta) { return clusterMeta.parent().getId().equalsIgnoreCase(currentDcId); } @@ -220,6 +237,10 @@ private boolean hasMultipleActiveDcs(ClusterType clusterType) { return clusterType.supportMultiActiveDC() && !clusterType.isCrossDc(); } + private boolean isClusterActiveDcCrossRegion(ClusterMeta clusterMeta) { + return metaCache.isCrossRegion(currentDcId, clusterMeta.getActiveDc()); + } + private Consumer removeConsumer = new Consumer() { @Override public void accept(RedisMeta redisMeta) { @@ -234,6 +255,20 @@ public void accept(RedisMeta redisMeta) { } }; + private Consumer removePingActionConsumer = new Consumer() { + @Override + public void accept(RedisMeta redisMeta) { + removeRedisOnlyForPingAction(redisMeta); + } + }; + + private Consumer addPingActionConsumer = new Consumer() { + @Override + public void accept(RedisMeta redisMeta) { + addRedisOnlyForPingAction(redisMeta); + } + }; + @Override protected void doStart() { if (current == null) { @@ -276,6 +311,20 @@ private void addRedisOnlyForUsedMemory(RedisMeta added) { instanceManager.getOrCreateRedisInstanceForAssignedAction(added); } + private void removeRedisOnlyForPingAction(RedisMeta removed) { + if (null != instanceManager.removeRedisInstanceForPingAction(new HostPort(removed.getIp(), removed.getPort()))) { + logger.info("[removeRedisOnlyForPingAction][{}:{}] {}", removed.getIp(), removed.getPort(), removed); + } + } + + private void addRedisOnlyForPingAction(RedisMeta added) { + if (!isOneWayClusterActiveDcCrossRegionAndCurrentDc(added.parent().parent())) { + return; + } + logger.info("[addRedisOnlyForPingAction][{}:{}] {}", added.getIp(), added.getPort(), added); + instanceManager.getOrCreateRedisInstanceForPsubPingAction(added); + } + private class KeeperContainerMetaComparatorVisitor implements MetaComparatorVisitor { @Override diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java index 82a349e89..82655e604 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultMetaChangeManager.java @@ -97,7 +97,7 @@ public DcMetaChangeManager getOrCreate(String dcId) { @Override public DcMetaChangeManager create() { return new DefaultDcMetaChangeManager(dcId, instanceManager, healthCheckEndpointFactory, - checkerConsoleService, checkerConfig); + checkerConsoleService, checkerConfig, metaCache); } }); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java index 2cad344d3..dde343247 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/session/RedisSession.java @@ -109,6 +109,10 @@ public synchronized void crdtsubscribeIfAbsent(SubscribeCallback callback, Strin subscribeIfAbsent(callback, () -> new CRDTSubscribeCommand(clientPool, scheduled, commandTimeOut, channel), channel); } + public synchronized void psubscribeIfAbsent(SubscribeCallback callback, String... channel) { + subscribeIfAbsent(callback, () -> new PsubscribeCommand(clientPool, scheduled, commandTimeOut, channel), channel); + } + private synchronized void subscribeIfAbsent(SubscribeCallback callback, Supplier subCommandSupplier, String... channel) { PubSubConnectionWrapper pubSubConnectionWrapper = subscribConns.get(Sets.newHashSet(channel)); if (pubSubConnectionWrapper == null || pubSubConnectionWrapper.shouldCreateNewSession()) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java index 5c02c2a1d..563d7e3d7 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/HealthCheckReporter.java @@ -9,6 +9,7 @@ import com.ctrip.xpipe.redis.checker.RedisDelayManager; import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderAware; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; +import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStateService; import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingService; import com.ctrip.xpipe.redis.checker.model.CheckerRole; @@ -21,8 +22,12 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; /** * @author lishanglin @@ -30,7 +35,7 @@ */ public class HealthCheckReporter implements GroupCheckerLeaderAware { - private HealthStateService healthStateService; + private List healthStateServices; private RedisDelayManager redisDelayManager; @@ -60,11 +65,11 @@ public class HealthCheckReporter implements GroupCheckerLeaderAware { private static final Logger logger = LoggerFactory.getLogger(HealthCheckReporter.class); - public HealthCheckReporter(HealthStateService healthStateService, CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, + public HealthCheckReporter(List healthStateServices, CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, ClusterServer clusterServer, ClusterServer allCheckerServer, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, ClusterHealthManager clusterHealthManager, int serverPort) { - this.healthStateService = healthStateService; + this.healthStateServices = healthStateServices; this.serverPort = serverPort; this.config = checkerConfig; this.checkerConsoleService = checkerConsoleService; @@ -142,7 +147,9 @@ private void reportCheckResult() { result.encodeCrossMasterDelays(crossMasterDelayManager.getAllCrossMasterDelays()); result.encodeRedisAlives(pingService.getAllRedisAlives()); result.setWarningClusterShards(clusterHealthManager.getAllClusterWarningShards()); - result.encodeRedisStates(healthStateService.getAllCachedState()); + result.encodeRedisStates(healthStateServices.stream() + .flatMap(healthStateService -> healthStateService.getAllCachedState().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); result.setHeteroShardsDelay(redisDelayManager.getAllHeteroShardsDelays()); checkerConsoleService.report(config.getConsoleAddress(), result); diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java index c11e1acd9..7e2167b69 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/compensator/InstanceHealthStatusConsistenceInspectorTest.java @@ -84,7 +84,6 @@ public void setupInstanceHealthStatusConsistenceCheckerTest() throws Exception { Mockito.when(config.getRedisReplicationHealthCheckInterval()).thenReturn(2000); Mockito.when(config.getDownAfterCheckNums()).thenReturn(5); Mockito.when(siteStability.isSiteStable()).thenReturn(true); - Mockito.when(delayPingActionCollector.getState(any())).thenReturn(HEALTH_STATE.DOWN); } @Override diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java index f32334f43..fe73ce780 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/meta/DefaultDcMetaChangeManagerTest.java @@ -74,7 +74,7 @@ public void beforeDefaultDcMetaChangeManagerTest() throws IOException, SAXExcept when(checkerConfig.getConsoleAddress()).thenReturn("127.0.0.1"); when(checkerConsoleService.getXpipeDcAllMeta(Mockito.anyString(), Mockito.anyString())).thenReturn(getXpipeMeta()); - manager = new DefaultDcMetaChangeManager("oy", instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager("oy", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); } private void prepareData(String dc) { @@ -263,7 +263,7 @@ public void visitRemoved() { @Test public void visitRemovedClusterActiveDc(){ - manager = spy(new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig)); + manager = spy(new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache)); manager.compare(getDcMeta("jq"), null); DcMeta dcMeta = MetaCloneFacade.INSTANCE.clone(getDcMeta("jq")); @@ -359,7 +359,7 @@ public void testSwitchClusterShards() throws Exception { @Test public void testHeteroClusterModified() throws Exception { - DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig); + DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); DcMeta dcMeta= getDcMeta("jq"); ClusterMeta cluster1 = dcMeta.findCluster("cluster2"); cluster1.setBackupDcs("ali"); @@ -384,7 +384,7 @@ public void testHeteroClusterModified() throws Exception { @Test public void testBackupDcClusterModified() throws Exception { - DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig); + DefaultDcMetaChangeManager manager = new DefaultDcMetaChangeManager("jq", instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); DcMeta dcMeta= getDcMeta("jq"); ClusterMeta cluster1 = dcMeta.findCluster("cluster2"); cluster1.setBackupDcs("jq,ali"); @@ -491,7 +491,7 @@ public void visitModified1() { @Test public void testKeeperChange() throws Exception { String dcId = FoundationService.DEFAULT.getDataCenter(); - manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); prepareData(dcId); DcMeta future = cloneDcMeta(dcId); future.addKeeperContainer(new KeeperContainerMeta().setId(4L).setIp("1.1.1.4").setPort(8080)); @@ -512,7 +512,7 @@ public void testKeeperChange() throws Exception { @Test public void testRedisChange() throws Exception { String dcId = FoundationService.DEFAULT.getDataCenter(); - manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig); + manager = new DefaultDcMetaChangeManager(dcId, instanceManager, factory, checkerConsoleService, checkerConfig, metaCache); prepareData(dcId); DcMeta future = cloneDcMeta(dcId); future.addKeeperContainer(new KeeperContainerMeta().setId(4L).setIp("1.1.1.4").setPort(8080)); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java index 595743375..cf3b112fa 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/CheckerContextConfig.java @@ -42,6 +42,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; +import java.util.List; import java.util.concurrent.ExecutorService; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; @@ -189,9 +190,9 @@ public SentinelManager sentinelManager() { public HealthCheckReporter healthCheckReporter(CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, GroupCheckerLeaderElector clusterServer, AllCheckerLeaderElector allCheckerLeaderElector, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, - ClusterHealthManager clusterHealthManager, HealthStateService healthStateService, + ClusterHealthManager clusterHealthManager, List healthStateServices, @Value("${server.port}") int serverPort) { - return new HealthCheckReporter(healthStateService, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, + return new HealthCheckReporter(healthStateServices, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, crossMasterDelayManager, pingService, clusterHealthManager, serverPort); } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java new file mode 100644 index 000000000..36986f44c --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/pubsub/PsubscribeCommand.java @@ -0,0 +1,33 @@ +package com.ctrip.xpipe.redis.core.protocal.cmd.pubsub; + +import com.ctrip.xpipe.api.pool.SimpleObjectPool; +import com.ctrip.xpipe.netty.commands.NettyClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + +public class PsubscribeCommand extends AbstractSubscribe{ + + private static final Logger logger = LoggerFactory.getLogger(PsubscribeCommand.class); + + protected PsubscribeCommand(String host, int port, ScheduledExecutorService scheduled, MESSAGE_TYPE messageType, String... subscribeChannel) { + super(host, port, scheduled, messageType, subscribeChannel); + } + + public PsubscribeCommand(SimpleObjectPool clientPool, ScheduledExecutorService scheduled, int commandTimeoutMilli, String... channel) { + super(clientPool, scheduled, commandTimeoutMilli, MESSAGE_TYPE.PMESSAGE, channel); + } + + + @Override + public String getName() { + return "psubscribe"; + } + + @Override + protected Logger getLogger() { + return logger; + } + +} diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java index 6ef5ff7f6..aa984d3f4 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/console/spring/console/TestCheckerContextConfig.java @@ -44,6 +44,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.*; +import java.util.List; import java.util.concurrent.ExecutorService; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR; @@ -184,9 +185,9 @@ public SentinelManager sentinelManager() { public HealthCheckReporter healthCheckReporter(CheckerConfig checkerConfig, CheckerConsoleService checkerConsoleService, GroupCheckerLeaderElector clusterServer, AllCheckerLeaderElector allCheckerLeaderElector, RedisDelayManager redisDelayManager, CrossMasterDelayManager crossMasterDelayManager, PingService pingService, - ClusterHealthManager clusterHealthManager, HealthStateService healthStateService, + ClusterHealthManager clusterHealthManager, List healthStateServices, @Value("${server.port}") int serverPort) { - return new HealthCheckReporter(healthStateService, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, + return new HealthCheckReporter(healthStateServices, checkerConfig, checkerConsoleService, clusterServer, allCheckerLeaderElector, redisDelayManager, crossMasterDelayManager, pingService, clusterHealthManager, serverPort); }