diff --git a/core/src/main/java/com/ctrip/xpipe/api/migration/OuterClientService.java b/core/src/main/java/com/ctrip/xpipe/api/migration/OuterClientService.java index 76be680e0..a84a85794 100644 --- a/core/src/main/java/com/ctrip/xpipe/api/migration/OuterClientService.java +++ b/core/src/main/java/com/ctrip/xpipe/api/migration/OuterClientService.java @@ -764,6 +764,11 @@ public String getActiveDc() { public void setActiveDc(String activeDc) { this.activeDc = activeDc; } + + @Override + public String toString() { + return String.format("[%s:%s]%s", clusterName, activeDc, hostPortDcStatuses); + } } @JsonIgnoreProperties(ignoreUnknown = true) @@ -815,6 +820,11 @@ public void setDc(String dc) { public void setCanRead(boolean canRead) { this.canRead = canRead; } + + @Override + public String toString() { + return String.format("{%s:%d-%s|%s}", host, port, dc, canRead); + } } enum ClusterType { diff --git a/core/src/main/java/com/ctrip/xpipe/concurrent/AggregatorStateSetterManager.java b/core/src/main/java/com/ctrip/xpipe/concurrent/AggregatorStateSetterManager.java deleted file mode 100644 index 0b4508787..000000000 --- a/core/src/main/java/com/ctrip/xpipe/concurrent/AggregatorStateSetterManager.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.ctrip.xpipe.concurrent; - -import com.ctrip.xpipe.command.AbstractCommand; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.concurrent.Executor; -import java.util.function.BiConsumer; -import java.util.function.Function; - -public class AggregatorStateSetterManager> { - - private Logger logger = LoggerFactory.getLogger(getClass()); - - private KeyedOneThreadTaskExecutor keyedOneThreadTaskExecutor; - - private Function getter; - private BiConsumer setter; - - public AggregatorStateSetterManager(Executor executors, Function getter, BiConsumer setter){ - this.getter = getter; - this.setter = setter; - keyedOneThreadTaskExecutor = new KeyedOneThreadTaskExecutor(executors); - } - - public void set(K k){ - logger.debug("[aggregator set]{}", k); - keyedOneThreadTaskExecutor.execute(k, new AggregatorCheckAndSetTask(k)); - } - - public class AggregatorCheckAndSetTask extends AbstractCommand { - - private int retry = 3; - private K k; - - public AggregatorCheckAndSetTask(K k){ - this.k = k; - } - - @Override - protected void doExecute() throws Exception { - - Exception exception = null; - - S newValue = null; - - if(getter != null){ - try{ - newValue = getter.apply(k); - }catch(Exception e){ - logger.error("[doRun][aggregator]" + k, e); - } - } - - if (null == newValue) { - logger.info("[doRun] unexpected null newValue"); - future().setFailure(new IllegalArgumentException("new value null")); - return; - } - - if(!newValue.isEmpty()) { - - for(int i=0; i < retry ;i++){ - try{ - logger.debug("[doRun][aggregator][begin]{}", k); - setter.accept(k, newValue); - logger.debug("[doRun][aggregator][end]{}", k); - exception = null; - break; - }catch (Exception e){ - exception = e; - logger.error("[setter error][aggregator]" + k, e); - } - } - }else{ - logger.info("[doRun][aggregator][newValue empty, skip] {}", k); - } - - if(exception != null){ - future().setFailure(exception); - }else{ - future().setSuccess(); - } - } - - @Override - public String getName() { - return "[AggregatorCheckAndSetTask]" + k; - } - - @Override - protected void doReset() { - } - } - -} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/CheckerConfig.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/CheckerConfig.java index 8ed38b528..8ddad1162 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/CheckerConfig.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/CheckerConfig.java @@ -108,8 +108,8 @@ public interface CheckerConfig { int getKeeperCheckerIntervalMilli(); - int getInstancePullIntervalSeconds(); + int getMarkInstanceBaseDelayMilli(); - int getInstancePullRandomSeconds(); + int getMarkInstanceMaxDelayMilli(); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/CheckConfigBean.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/CheckConfigBean.java index f8239b256..7fe21425a 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/CheckConfigBean.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/CheckConfigBean.java @@ -113,6 +113,10 @@ public class CheckConfigBean extends AbstractConfigBean { public static final String KEY_KEEPER_CHECKER_INTERVAL = "keeper.checker.interval"; + public static final String KEY_CHECKER_MARK_DELAY_BASE = "checker.health.mark.delay.base.milli"; + + public static final String KEY_CHECKER_MARK_DELAY_MAX = "checker.health.mark.delay.max.milli"; + public static final String KEY_CHECKER_INSTANCE_PULL_INTERVAL = "checker.instance.pull.interval"; public static final String KEY_CHECKER_INSTANCE_PULL_RANDOM = "checker.instance.pull.random"; @@ -346,6 +350,14 @@ public int getKeeperCheckerIntervalMilli() { return getIntProperty(KEY_KEEPER_CHECKER_INTERVAL, 60 * 1000); } + public int getMarkInstanceBaseDelayMilli() { + return getIntProperty(KEY_CHECKER_MARK_DELAY_BASE, 3000); + } + + public int getMarkInstanceMaxDelayMilli() { + return getIntProperty(KEY_CHECKER_MARK_DELAY_MAX, 10000); + } + public int getInstancePullIntervalSeconds() { return getIntProperty(KEY_CHECKER_INSTANCE_PULL_INTERVAL, 5); } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AggregatorPullService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AggregatorPullService.java index 8663fbeb0..e56013520 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AggregatorPullService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/AggregatorPullService.java @@ -8,7 +8,7 @@ public interface AggregatorPullService { - Set getNeedAdjustInstances(Set instances) throws Exception; + Set getNeedAdjustInstances(String cluster, Set instances) throws Exception; void doMarkInstances(String clusterName, Set instances) throws OuterClientException; diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java index 585d27b40..29c08b4b8 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/DefaultAggregatorPullService.java @@ -1,9 +1,10 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction; +import com.ctrip.xpipe.api.command.CommandFuture; import com.ctrip.xpipe.api.migration.OuterClientException; import com.ctrip.xpipe.api.migration.OuterClientService; import com.ctrip.xpipe.api.migration.OuterClientService.*; -import com.ctrip.xpipe.endpoint.ClusterShardHostPort; +import com.ctrip.xpipe.command.AbstractCommand; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.CheckerService; import com.ctrip.xpipe.redis.checker.RemoteCheckerManager; @@ -12,12 +13,15 @@ import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.compensator.data.XPipeInstanceHealthHolder; import com.ctrip.xpipe.redis.core.meta.MetaCache; +import com.ctrip.xpipe.utils.XpipeThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; import java.util.*; +import java.util.concurrent.*; @Component @@ -34,15 +38,31 @@ public class DefaultAggregatorPullService implements AggregatorPullService{ private OuterClientService outerClientService = OuterClientService.DEFAULT; private static final Logger logger = LoggerFactory.getLogger(DefaultAggregatorPullService.class); + private ExecutorService executors; + + @PostConstruct + public void postConstruct() { + this.executors = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(256), XpipeThreadFactory.create("DefaultAggregatorPullService"), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + @Override - public Set getNeedAdjustInstances(Set instances) throws Exception{ + public Set getNeedAdjustInstances(String cluster, Set instances) throws Exception { Set instanceNeedAdjust = new HashSet<>(); - Map xpipeAllHealthStatus = getXpipeAllHealthStatus(instances); - logger.debug("[DefaultAggregatorPullService][getNeedAdjustInstances]xpipeAllHealthStatus:{}", xpipeAllHealthStatus); - Map outerClientAllHealthStatus = getOuterClientAllHealthStatus(instances); - logger.debug("[DefaultAggregatorPullService][getNeedAdjustInstances]outerClientAllHealthStatus:{}", outerClientAllHealthStatus); + + QueryXPipeInstanceStatusCmd queryXPipeInstanceStatusCmd = new QueryXPipeInstanceStatusCmd(cluster, instances); + QueryOuterClintInstanceStatusCmd queryOuterClintInstanceStatusCmd = new QueryOuterClintInstanceStatusCmd(cluster, instances); + + CommandFuture> xpipeQueryFuture = queryXPipeInstanceStatusCmd.execute(executors); + CommandFuture> outerClientQueryFuture = queryOuterClintInstanceStatusCmd.execute(executors); + + Map xpipeAllHealthStatus = xpipeQueryFuture.get(); + Map outerClientAllHealthStatus = outerClientQueryFuture.get(); + for (Map.Entry entry : xpipeAllHealthStatus.entrySet()) { - if (!outerClientAllHealthStatus.containsKey(entry.getKey()) || !entry.getValue().equals(outerClientAllHealthStatus.get(entry.getKey()))) { + if (!outerClientAllHealthStatus.containsKey(entry.getKey()) + || !entry.getValue().equals(outerClientAllHealthStatus.get(entry.getKey()))) { instanceNeedAdjust.add( new HostPortDcStatus(entry.getKey().getHost(), entry.getKey().getPort(), metaCache.getDc(entry.getKey()), entry.getValue())); } @@ -57,29 +77,10 @@ public void doMarkInstances(String clusterName, Set instances) outerClientService.batchMarkInstance(markInstanceRequest); } - public Map getXpipeAllHealthStatus(Set instances) { - XPipeInstanceHealthHolder xPipeInstanceHealthHolder = new XPipeInstanceHealthHolder(); - for (CheckerService checkerService : remoteCheckerManager.getAllCheckerServices()) { - xPipeInstanceHealthHolder.add(checkerService.getAllClusterInstanceHealthStatus(instances)); - } - return xPipeInstanceHealthHolder.getAllHealthStatus(checkerConfig.getQuorum()); - } - - public Map getOuterClientAllHealthStatus(Set hostPorts) throws Exception { - Map instancesUp = new HashMap<>(); - for (HostPort hostPort : hostPorts) { - ClusterShardHostPort clusterShardHostPort = new ClusterShardHostPort(null, null, hostPort); - instancesUp.put(clusterShardHostPort, outerClientService.isInstanceUp(clusterShardHostPort)); - } - Map result = new HashMap<>(); - for (Map.Entry entry : instancesUp.entrySet()) { - result.put(entry.getKey().getHostPort(), entry.getValue()); - } - return result; - } + private void alertMarkInstance(String clusterName, Set instances) { + if (instances.isEmpty()) return; - public void alertMarkInstance(String clusterName, Set instances) { - if (!instances.isEmpty()) { + try { for (HostPortDcStatus instance : instances) { if (instance.isCanRead()) { alertManager.alert(clusterName, null, @@ -89,6 +90,65 @@ public void alertMarkInstance(String clusterName, Set instance new HostPort(instance.getHost(), instance.getPort()), ALERT_TYPE.MARK_INSTANCE_DOWN, "Mark Instance Down"); } } + } catch (Throwable th) { + logger.info("[alertMarkInstance][{}] fail", clusterName, th); } } + + private class QueryXPipeInstanceStatusCmd extends AbstractCommand> { + + private String cluster; + + private Set instances; + + public QueryXPipeInstanceStatusCmd(String cluster, Set instances) { + this.cluster = cluster; + this.instances = instances; + } + + @Override + protected void doExecute() throws Throwable { + XPipeInstanceHealthHolder xpipeInstanceHealthHolder = new XPipeInstanceHealthHolder(); + for (CheckerService checkerService : remoteCheckerManager.getAllCheckerServices()) { + xpipeInstanceHealthHolder.add(checkerService.getAllClusterInstanceHealthStatus(instances)); + } + future().setSuccess(xpipeInstanceHealthHolder.getAllHealthStatus(checkerConfig.getQuorum())); + } + + @Override + protected void doReset() { + } + + @Override + public String getName() { + return "QueryXPipeInstanceStatusCmd"; + } + } + + private class QueryOuterClintInstanceStatusCmd extends AbstractCommand> { + + private String cluster; + + private Set instances; + + public QueryOuterClintInstanceStatusCmd(String cluster, Set instances) { + this.cluster = cluster; + this.instances = instances; + } + + @Override + protected void doExecute() throws Throwable { + future().setSuccess(outerClientService.batchQueryInstanceStatus(cluster, instances)); + } + + @Override + protected void doReset() { + } + + @Override + public String getName() { + return "[QueryOuterClintInstanceStatusCmd]" + cluster; + } + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java index 0b5f5df30..cf21b1f00 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/AbstractHealthEventHandler.java @@ -142,11 +142,13 @@ private boolean stateUpNow(AbstractInstanceEvent event) { protected boolean masterUp(AbstractInstanceEvent instanceEvent) { RedisInstanceInfo info = instanceEvent.getInstance().getCheckInfo(); HostPort redisMaster = metaCache.findMasterInSameShard(info.getHostPort()); - boolean masterUp = defaultDelayPingActionCollector.getState(redisMaster) == HEALTH_STATE.HEALTHY; - if (!masterUp) { - logger.info("[masterUp][master down instance:{}, master:{}]", info, redisMaster); + HEALTH_STATE masterState = defaultDelayPingActionCollector.getState(redisMaster); + + if (HEALTH_STATE.UNHEALTHY.equals(masterState) || HEALTH_STATE.DOWN.equals(masterState) || HEALTH_STATE.SICK.equals(masterState)) { + logger.info("[masterUp][master down instance:{}, master:{}] {}", info, redisMaster, masterState); + return false; } - return masterUp; + return true; } protected boolean quorumState(List healthStates, HostPort hostPort) { diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregator.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregator.java index a191a57ba..e0f85f4c2 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregator.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregator.java @@ -1,12 +1,14 @@ package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.handler; import com.ctrip.xpipe.api.migration.OuterClientService.*; +import com.ctrip.xpipe.command.AbstractCommand; import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; -import com.ctrip.xpipe.concurrent.AggregatorStateSetterManager; +import com.ctrip.xpipe.concurrent.KeyedOneThreadTaskExecutor; import com.ctrip.xpipe.endpoint.ClusterShardHostPort; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.AggregatorPullService; +import com.ctrip.xpipe.utils.MapUtils; import com.ctrip.xpipe.utils.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,38 +26,56 @@ @Component public class DefaultOuterClientAggregator implements OuterClientAggregator{ - private final ConcurrentHashMap> registry = new ConcurrentHashMap<>(); + private HashMap> clusterAggregators = new HashMap<>(); + @Resource(name = SCHEDULED_EXECUTOR) private ScheduledExecutorService scheduled; + @Resource(name = GLOBAL_EXECUTOR) private ExecutorService executors; + @Autowired private CheckerConfig checkerConfig; + @Autowired private AggregatorPullService aggregatorPullService; - private final Random rand = new Random(); - private AggregatorStateSetterManager> finalStateSetterManager; + + private Random rand = new Random(); + + private KeyedOneThreadTaskExecutor clusterOneThreadTaskExecutor; + private static final Logger logger = LoggerFactory.getLogger(DefaultOuterClientAggregator.class); + @PostConstruct public void postConstruct() { - finalStateSetterManager = new AggregatorStateSetterManager<>(executors, this::getNeedMarkInstances, this::doMarkInstance); + clusterOneThreadTaskExecutor = new KeyedOneThreadTaskExecutor<>(executors); } @Override public void markInstance(ClusterShardHostPort info) { - synchronized (info.getClusterName().intern()) { - Set instances = registry.computeIfAbsent(info.getClusterName(), k -> new HashSet<>()); - if (instances.add(info.getHostPort()) && instances.size() == 1) { + Set aggregator = MapUtils.getOrCreate(clusterAggregators, info.getClusterName(), HashSet::new); + synchronized (aggregator) { + boolean emptyBeforeAdd = aggregator.isEmpty(); + if (aggregator.add(info.getHostPort()) && emptyBeforeAdd) { scheduled.schedule(new AbstractExceptionLogTask() { @Override protected void doRun() throws Exception { - Set instancesToRemove; - synchronized (info.getClusterName().intern()) { - instancesToRemove = registry.remove(info.getClusterName()); + Set batch; + Set innerAggregator = clusterAggregators.get(info.getClusterName()); + if (null == innerAggregator) { + logger.warn("[markInstance][aggregate][unexpected null aggregator] {}", info.getClusterName()); + return; } - if (instancesToRemove != null) { - handleInstances(info.getClusterName(), instancesToRemove); + + synchronized (innerAggregator) { + if (innerAggregator.isEmpty()) { + logger.warn("[markInstance][aggregate][unexpected empty aggregator] {}", info.getClusterName()); + return; + } + batch = new HashSet<>(innerAggregator); + innerAggregator.clear(); } + handleInstances(info.getClusterName(), batch); } }, randomMill(), TimeUnit.MILLISECONDS); } @@ -63,70 +83,84 @@ protected void doRun() throws Exception { } private void handleInstances(String clusterName, Set instances) { - logger.info("[DefaultOuterClientAggregator][handleInstances]cluster:{}, instances:{}", clusterName, instances); - finalStateSetterManager.set(new ClusterHostPorts(clusterName, instances)); + logger.info("[handleInstances][{}] {}", clusterName, instances); + clusterOneThreadTaskExecutor.execute(clusterName, new AggregatorCheckAndSetTask(clusterName, instances)); } @VisibleForTesting - public long randomMill () { - double randomNum = checkerConfig.getInstancePullIntervalSeconds() + rand.nextDouble() * checkerConfig.getInstancePullRandomSeconds(); - return (long) (randomNum * 1000); + public int randomMill() { + int delayBase = Math.max(checkerConfig.getMarkInstanceBaseDelayMilli(), 0); + int delayMax = Math.max(checkerConfig.getMarkInstanceMaxDelayMilli(), delayBase); + return delayBase + rand.nextInt(delayMax - delayBase); } - private Set getNeedMarkInstances(ClusterHostPorts clusterHostPorts) { - try { - logger.info("[DefaultOuterClientAggregator][getNeedMarkInstances]clusterHostPorts:{}", clusterHostPorts); - return aggregatorPullService.getNeedAdjustInstances(clusterHostPorts.getHostPorts()); - } catch (Exception e) { - throw new RuntimeException(e); - } + private Set getNeedMarkInstances(String cluster, Set clusterHostPorts) throws Exception { + logger.info("[getNeedMarkInstances][{}]", cluster); + return aggregatorPullService.getNeedAdjustInstances(cluster, clusterHostPorts); } - private void doMarkInstance(ClusterHostPorts clusterHostPorts, Set needMarkInstances) { - try { - logger.info("[DefaultOuterClientAggregator][handleInstances]clusterHostPorts:{}, needMarkInstances:{}", clusterHostPorts, needMarkInstances); - aggregatorPullService.doMarkInstances(clusterHostPorts.getClusterName(), needMarkInstances); - } catch (Exception e) { - throw new IllegalStateException("set error:" + needMarkInstances, e); - } + private void doMarkInstance(String cluster, Set needMarkInstances) throws Exception { + logger.info("[doMarkInstance][{}]", cluster); + aggregatorPullService.doMarkInstances(cluster, needMarkInstances); } - private static class ClusterHostPorts { - private String clusterName; - private Set hostPorts; + public class AggregatorCheckAndSetTask extends AbstractCommand { - public ClusterHostPorts(String clusterName, Set hostPorts) { - this.clusterName = clusterName; - this.hostPorts = hostPorts; - } + private int retry; + + private String cluster; - public String getClusterName() { - return clusterName; + private Set instances; + + public AggregatorCheckAndSetTask(String cluster, Set instances) { + this(cluster, instances, 3); } - public Set getHostPorts() { - return hostPorts; + public AggregatorCheckAndSetTask(String cluster, Set instances, int retry){ + this.cluster = cluster; + this.instances = instances; + this.retry = retry; } @Override - public String toString() { - return "ClusterHostPorts{" + - "clusterName='" + clusterName + '\'' + - ", hostPorts=" + hostPorts + - '}'; + protected void doExecute() throws Exception { + + Set instancesToUpdate; + + try { + instancesToUpdate = getNeedMarkInstances(cluster, instances); + } catch (Throwable th) { + logger.info("[aggregator]][getFail[{}]", cluster, th); + future().setFailure(th); + return; + } + + if (instancesToUpdate.isEmpty()) { + logger.info("[aggregator][getEmpty] skip"); + future().setSuccess(); + } else { + for(int i=0; i < retry ;i++){ + try{ + logger.debug("[aggregator][begin] {}", cluster); + doMarkInstance(cluster, instancesToUpdate); + future().setSuccess(); + logger.debug("[aggregator][end] {}", cluster); + return; + } catch (Throwable th){ + logger.error("[aggregator][setFail] " + cluster, th); + } + } + future().setFailure(new IllegalStateException("[aggregator][fail]" + cluster)); + } } @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof ClusterHostPorts)) return false; - ClusterHostPorts that = (ClusterHostPorts) o; - return Objects.equals(getClusterName(), that.getClusterName()) && Objects.equals(getHostPorts(), that.getHostPorts()); + public String getName() { + return "[AggregatorCheckAndSetTask]" + cluster; } @Override - public int hashCode() { - return Objects.hash(getClusterName(), getHostPorts()); + protected void doReset() { } } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/TestConfig.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/TestConfig.java index d35e4d339..e62fc6f68 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/TestConfig.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/TestConfig.java @@ -306,13 +306,13 @@ public int getKeeperCheckerIntervalMilli() { } @Override - public int getInstancePullIntervalSeconds() { - return 5; + public int getMarkInstanceBaseDelayMilli() { + return 1000; } @Override - public int getInstancePullRandomSeconds() { - return 5; + public int getMarkInstanceMaxDelayMilli() { + return 2000; } } diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregatorTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregatorTest.java index ddc952b04..72df8e05f 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregatorTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/interaction/handler/DefaultOuterClientAggregatorTest.java @@ -53,8 +53,8 @@ public void beforeDefaultOuterClientAggregatorTest() { info1 = new ClusterShardHostPort(cluster1, null, hostPort1); info2 = new ClusterShardHostPort(cluster1, null, hostPort2); info3 = new ClusterShardHostPort(cluster1, null, hostPort3); - when(checkerConfig.getInstancePullIntervalSeconds()).thenReturn(pullIntervalSeconds); - when(checkerConfig.getInstancePullRandomSeconds()).thenReturn(pullRandomSeconds); + when(checkerConfig.getMarkInstanceBaseDelayMilli()).thenReturn(pullIntervalSeconds * 1000); + when(checkerConfig.getMarkInstanceMaxDelayMilli()).thenReturn(pullRandomSeconds * 1000); outerClientAggregator.setScheduled(MoreExecutors.getExitingScheduledExecutorService( new ScheduledThreadPoolExecutor(1, XpipeThreadFactory.create("DefaultOuterClientAggregatorTest")), 5, TimeUnit.SECONDS diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/CombConsoleConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/CombConsoleConfig.java deleted file mode 100644 index 31d415941..000000000 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/CombConsoleConfig.java +++ /dev/null @@ -1,616 +0,0 @@ -package com.ctrip.xpipe.redis.console.config.impl; - -import com.ctrip.xpipe.api.codec.GenericTypeReference; -import com.ctrip.xpipe.api.config.ConfigChangeListener; -import com.ctrip.xpipe.cluster.ClusterType; -import com.ctrip.xpipe.codec.JsonCodec; -import com.ctrip.xpipe.config.AbstractConfigBean; -import com.ctrip.xpipe.config.ConfigKeyListener; -import com.ctrip.xpipe.redis.checker.config.impl.CheckConfigBean; -import com.ctrip.xpipe.redis.checker.config.impl.CommonConfigBean; -import com.ctrip.xpipe.redis.checker.config.impl.ConsoleConfigBean; -import com.ctrip.xpipe.redis.checker.config.impl.DataCenterConfigBean; -import com.ctrip.xpipe.redis.console.config.ConsoleConfig; -import com.ctrip.xpipe.redis.console.config.model.BeaconOrgRoute; -import com.ctrip.xpipe.redis.console.util.HickwallMetricInfo; -import com.ctrip.xpipe.redis.core.meta.QuorumConfig; -import com.ctrip.xpipe.tuple.Pair; -import com.ctrip.xpipe.utils.StringUtil; -import com.google.common.collect.Maps; -import io.netty.util.internal.ConcurrentSet; - -import java.util.*; - - -public class CombConsoleConfig implements ConsoleConfig, ConfigChangeListener { - - private CheckConfigBean checkConfigBean; - - private ConsoleConfigBean consoleConfigBean; - - private DataCenterConfigBean dataCenterConfigBean; - - private CommonConfigBean commonConfigBean; - - private List configBeans; - - public CombConsoleConfig(CheckConfigBean checkConfigBean, - ConsoleConfigBean consoleConfigBean, - DataCenterConfigBean dataCenterConfigBean, - CommonConfigBean commonConfigBean) { - this.checkConfigBean = checkConfigBean; - this.consoleConfigBean = consoleConfigBean; - this.dataCenterConfigBean = dataCenterConfigBean; - this.commonConfigBean = commonConfigBean; - configBeans = new ArrayList<>(); - configBeans.add(consoleConfigBean); - configBeans.add(dataCenterConfigBean); - configBeans.add(commonConfigBean); - configBeans.add(checkConfigBean); - for (AbstractConfigBean configBean : configBeans) { - configBean.addConfigChangeListener(this); - } - } - - private Map> listeners = Maps.newConcurrentMap(); - - private Set listenersSet = new ConcurrentSet<>(); - - private String hickwallInfo; - - private HickwallMetricInfo info; - - @Override - public String getServerMode() { - return checkConfigBean.getServerMode(); - } - - @Override - public String getDatasource() { - return commonConfigBean.getDatasource(); - } - - @Override - public int getConsoleNotifyRetryTimes() { - return consoleConfigBean.getConsoleNotifyRetryTimes(); - } - - @Override - public int getConsoleNotifyRetryInterval() { - return consoleConfigBean.getConsoleNotifyRetryInterval(); - } - - @Override - public Map getMetaservers() { - return dataCenterConfigBean.getMetaservers(); - } - - @Override - public int getConsoleNotifyThreads() { - return consoleConfigBean.getConsoleNotifyThreads(); - } - - @Override - public Set getConsoleUserAccessWhiteList() { - return commonConfigBean.getConsoleUserAccessWhiteList(); - } - - @Override - public int getRedisReplicationHealthCheckInterval() { - return checkConfigBean.getRedisReplicationHealthCheckInterval(); - } - - @Override - public int getCheckerCurrentDcAllMetaRefreshIntervalMilli() { - return checkConfigBean.getCheckerCurrentDcAllMetaRefreshIntervalMilli(); - } - - @Override - public int getClusterHealthCheckInterval() { - return checkConfigBean.getClusterHealthCheckInterval(); - } - - @Override - public Map getHickwallClusterMetricFormat() { - return commonConfigBean.getHickwallClusterMetricFormat(); - } - - @Override - public HickwallMetricInfo getHickwallMetricInfo() { - String localInfo = commonConfigBean.getHickwallMetricInfo(); - if(StringUtil.isEmpty(hickwallInfo) || !localInfo.equals(hickwallInfo)) { - hickwallInfo = localInfo; - info = JsonCodec.INSTANCE.decode(hickwallInfo, HickwallMetricInfo.class); - } - return info; - } - - @Override - public int getHealthyDelayMilli() { - return checkConfigBean.getHealthyDelayMilli(); - } - - @Override - public long getHealthMarkCompensateIntervalMill() { - return checkConfigBean.getHealthMarkCompensateIntervalMill(); - } - - @Override - public int getHealthMarkCompensateThreads() { - return checkConfigBean.getHealthMarkCompensateThreads(); - } - - @Override - public int getHealthyDelayMilliThroughProxy() { - return checkConfigBean.getHealthyDelayMilliThroughProxy(); - } - - @Override - public int getInstanceLongDelayMilli() { - return checkConfigBean.getInstanceLongDelayMilli(); - } - - @Override - public int getDownAfterCheckNums() { - return checkConfigBean.getDownAfterCheckNums(); - } - - @Override - public int getDownAfterCheckNumsThroughProxy() { - return checkConfigBean.getDownAfterCheckNumsThroughProxy(); - } - - @Override - public int getCacheRefreshInterval() { - return consoleConfigBean.getCacheRefreshInterval(); - } - - @Override - public Set getAlertWhileList() { - return commonConfigBean.getAlertWhileList(); - } - - @Override - public int getQuorum() { - return checkConfigBean.getQuorum(); - } - - @Override - public int getRedisConfCheckIntervalMilli() { - return checkConfigBean.getRedisConfCheckIntervalMilli(); - } - - @Override - public int getSentinelCheckIntervalMilli() { - return checkConfigBean.getSentinelCheckIntervalMilli(); - } - - @Override - public String getConsoleDomain() { - return commonConfigBean.getConsoleDomain(); - } - - @Override - public String getClusterExcludedRegex() { - return commonConfigBean.getClusterExcludedRegex(); - } - - @Override - public QuorumConfig getDefaultSentinelQuorumConfig() { - return checkConfigBean.getDefaultSentinelQuorumConfig(); - } - - @Override - public int getStableLossAfterRounds() { - return checkConfigBean.getStableLossAfterRounds(); - } - - @Override - public int getStableRecoverAfterRounds() { - return checkConfigBean.getStableRecoverAfterRounds(); - } - - @Override - public int getStableResetAfterRounds() { - return checkConfigBean.getStableResetAfterRounds(); - } - - @Override - public float getSiteStableThreshold() { - return checkConfigBean.getSiteStableThreshold(); - } - - @Override - public float getSiteUnstableThreshold() { - return checkConfigBean.getSiteUnstableThreshold(); - } - - @Override - public Boolean getSiteStable() { - return checkConfigBean.getSiteStable(); - } - - @Override - public String getReplDisklessMinRedisVersion() { - return checkConfigBean.getReplDisklessMinRedisVersion(); - } - - @Override - public String getXRedisMinimumRequestVersion() { - return checkConfigBean.getXRedisMinimumRequestVersion(); - } - - @Override - public String getXpipeRuntimeEnvironment() { - return commonConfigBean.getXpipeRuntimeEnvironment(); - } - - @Override - public String getDBAEmails() { - return commonConfigBean.getDBAEmails(); - } - - @Override - public String getRedisAlertSenderEmail() { - return commonConfigBean.getRedisAlertSenderEmail(); - } - - @Override - public String getXPipeAdminEmails() { - return commonConfigBean.getXPipeAdminEmails(); - } - - @Override - public int getAlertSystemSuspendMinute() { - return commonConfigBean.getAlertSystemSuspendMinute(); - } - - @Override - public int getAlertSystemRecoverMinute() { - return commonConfigBean.getAlertSystemRecoverMinute(); - } - - @Override - public int getConfigDefaultRestoreHours() { - return consoleConfigBean.getConfigDefaultRestoreHours(); - } - - @Override - public int getRebalanceSentinelInterval() { - return consoleConfigBean.getRebalanceSentinelInterval(); - } - - @Override - public int getRebalanceSentinelMaxNumOnce() { - return consoleConfigBean.getRebalanceSentinelMaxNumOnce(); - } - - @Override - public int getNoAlarmMinutesForClusterUpdate() { - return commonConfigBean.getNoAlarmMinutesForClusterUpdate(); - } - - @Override - public int getHealthCheckSuspendMinutes() { - return consoleConfigBean.getHealthCheckSuspendMinutes(); - } - - @Override - public Set getIgnoredHealthCheckDc() { - return checkConfigBean.getIgnoredHealthCheckDc(); - } - - @Override - public int getClustersPartIndex() { - return checkConfigBean.getClustersPartIndex(); - } - - @Override - public int getCheckerReportIntervalMilli() { - return checkConfigBean.getCheckerReportIntervalMilli(); - } - - @Override - public int getCheckerMetaRefreshIntervalMilli() { - return checkConfigBean.getCheckerMetaRefreshIntervalMilli(); - } - - @Override - public String getConsoleAddress() { - return checkConfigBean.getConsoleAddress(); - } - - @Override - public int getCheckerAckIntervalMilli() { - return checkConfigBean.getCheckerAckIntervalMilli(); - } - - @Override - public long getConfigCacheTimeoutMilli() { - return checkConfigBean.getConfigCacheTimeoutMilli(); - } - - @Override - public int getProxyCheckUpRetryTimes() { - return checkConfigBean.getProxyCheckUpRetryTimes(); - } - - @Override - public int getProxyCheckDownRetryTimes() { - return checkConfigBean.getProxyCheckDownRetryTimes(); - } - - private Set sentinelCheckOuterClientClusters() { - return checkConfigBean.sentinelCheckOuterClientClusters(); - } - - @Override - public boolean supportSentinelHealthCheck(ClusterType clusterType, String clusterName) { - return clusterType.supportHealthCheck() - || checkConfigBean.shouldSentinelCheckOuterClientClusters() - || sentinelCheckOuterClientClusters().contains(clusterName.toLowerCase()); - } - - @Override - public void register(List keys, ConfigChangeListener configListener) { - for(String key : keys) { - listeners.putIfAbsent(key, new LinkedList<>()); - listeners.get(key).add(configListener); - } - } - - @Override - public String sentinelCheckDowngradeStrategy() { - return checkConfigBean.sentinelCheckDowngradeStrategy(); - } - - @Override - public String crossDcSentinelMonitorNameSuffix() { - return checkConfigBean.crossDcSentinelMonitorNameSuffix(); - } - - @Override - public int getNonCoreCheckIntervalMilli() { - return checkConfigBean.getNonCoreCheckIntervalMilli(); - } - - @Override - public Set getOuterClusterTypes() { - return consoleConfigBean.getOuterClusterTypes(); - } - - @Override - public Map sentinelMasterConfig() { - return checkConfigBean.sentinelMasterConfig(); - } - - @Override - public long subscribeTimeoutMilli() { - return checkConfigBean.subscribeTimeoutMilli(); - } - - @Override - public String getDcsRelations() { - return commonConfigBean.getDcsRelations(); - } - - @Override - public int maxRemovedDcsCnt() { - return checkConfigBean.maxRemovedDcsCnt(); - } - - @Override - public int maxRemovedClustersPercent() { - return checkConfigBean.maxRemovedClustersPercent(); - } - - @Override - public int getKeeperCheckerIntervalMilli() { - return checkConfigBean.getKeeperCheckerIntervalMilli(); - } - - @Override - public int getInstancePullIntervalSeconds() { - return checkConfigBean.getInstancePullIntervalSeconds(); - } - - @Override - public int getInstancePullRandomSeconds() { - return checkConfigBean.getInstancePullRandomSeconds(); - } - - @Override - public int getPingDownAfterMilli() { - return checkConfigBean.getPingDownAfterMilli(); - } - - @Override - public int getPingDownAfterMilliThroughProxy() { - return checkConfigBean.getPingDownAfterMilliThroughProxy(); - } - - @Override - public Pair getClusterShardForMigrationSysCheck() { - return consoleConfigBean.getClusterShardForMigrationSysCheck(); - } - - @Override - public int getProxyInfoCollectInterval() { - return consoleConfigBean.getProxyInfoCollectInterval(); - } - - @Override - public int getOutterClientCheckInterval() { - return checkConfigBean.getOutterClientCheckInterval(); - } - - @Override - public int getOuterClientSyncInterval() { - return consoleConfigBean.getOuterClientSyncInterval(); - } - - @Override - public String getOuterClientToken() { - return commonConfigBean.getOuterClientToken(); - } - - @Override - public Map getConsoleDomains() { - return dataCenterConfigBean.getConsoleDomains(); - } - - @Override - public boolean isSentinelRateLimitOpen() { - return checkConfigBean.isSentinelRateLimitOpen(); - } - - @Override - public int getSentinelRateLimitSize() { - return checkConfigBean.getSentinelRateLimitSize(); - } - - @Override - public Set getVariablesCheckDataSources() { - return consoleConfigBean.getVariablesCheckDataSources(); - } - - @Override - public Set getOwnClusterType() { - return consoleConfigBean.getOwnClusterType(); - } - - @Override - public Set shouldNotifyClusterTypes() { - return commonConfigBean.shouldNotifyClusterTypes(); - } - - @Override - public String getCrossDcLeaderLeaseName() { - return dataCenterConfigBean.getCrossDcLeaderLeaseName(); - } - - @Override - public List getBeaconOrgRoutes() { - String property = dataCenterConfigBean.getBeaconOrgRoutes(); - return JsonCodec.INSTANCE.decode(property, new GenericTypeReference>() {}); - } - - @Override - public int getClusterDividedParts() { - List groupList = checkConfigBean.getClustersList(); - return groupList.size(); - } - - @Override - public int getCheckerAckTimeoutMilli() { - return dataCenterConfigBean.getCheckerAckTimeoutMilli(); - } - - @Override - public long getMigrationTimeoutMilli() { - return consoleConfigBean.getMigrationTimeoutMilli(); - } - - @Override - public long getServletMethodTimeoutMilli() { - return consoleConfigBean.getServletMethodTimeoutMilli(); - } - - @Override - public boolean isRedisConfigCheckMonitorOpen() { - return checkConfigBean.isRedisConfigCheckMonitorOpen(); - } - - @Override - public String getRedisConfigCheckRules() { - return checkConfigBean.getRedisConfigCheckRules(); - } - - @Override - public String getChooseRouteStrategyType() { - return commonConfigBean.getChooseRouteStrategyType(); - } - - @Override - public boolean isAutoMigrateOverloadKeeperContainerOpen() { - return consoleConfigBean.isAutoMigrateOverloadKeeperContainerOpen(); - } - - @Override - public long getAutoMigrateOverloadKeeperContainerIntervalMilli() { - return consoleConfigBean.getAutoMigrateOverloadKeeperContainerIntervalMilli(); - } - - @Override - public double getKeeperPairOverLoadFactor() { - return consoleConfigBean.getKeeperPairOverLoadFactor(); - } - - @Override - public double getKeeperContainerDiskOverLoadFactor() { - return consoleConfigBean.getKeeperContainerDiskOverLoadFactor(); - } - - @Override - public double getKeeperContainerIoRate() { - return consoleConfigBean.getKeeperContainerIoRate(); - } - - @Override - public long getMetaServerSlotClusterMapCacheTimeOutMilli() { - return consoleConfigBean.getMetaServerSlotClusterMapCacheTimeOutMilli(); - } - - @Override - public boolean autoSetKeeperSyncLimit() { - return consoleConfigBean.autoSetKeeperSyncLimit(); - } - - @Override - public void addListener(ConfigKeyListener listener) { - this.listenersSet.add(listener); - } - - @Override - public String getZkConnectionString() { - return dataCenterConfigBean.getZkConnectionString(); - } - - @Override - public String getZkNameSpace() { - return dataCenterConfigBean.getZkNameSpace(); - } - - public void onChange(String key, String oldValue, String newValue) { - - for(ConfigKeyListener listener : listenersSet) { - listener.onChange(key, newValue); - } - - if(!listeners.containsKey(key)) { - return; - } - for(ConfigChangeListener listener : listeners.get(key)) { - listener.onChange(key, oldValue, newValue); - } - } - - protected String getProperty(String key) { - for(AbstractConfigBean configBean : configBeans) { - String val = configBean.getProperty(key); - if(val != null) { - return val; - } - } - return null; - } - - protected String getProperty(String key, String def) { - String value = this.getProperty(key); - if(value == null) { - return def; - } else { - return value; - } - } -} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java index f6027ff86..700841513 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java @@ -523,13 +523,13 @@ public String getChooseRouteStrategyType() { } @Override - public int getInstancePullIntervalSeconds() { - return getIntProperty(KEY_CHECKER_INSTANCE_PULL_INTERVAL, 5); + public int getMarkInstanceBaseDelayMilli() { + return checkConfigBean.getMarkInstanceBaseDelayMilli(); } @Override - public int getInstancePullRandomSeconds() { - return getIntProperty(KEY_CHECKER_INSTANCE_PULL_RANDOM, 5); + public int getMarkInstanceMaxDelayMilli() { + return checkConfigBean.getMarkInstanceMaxDelayMilli(); } @Override diff --git a/services/ctrip-service/src/main/java/com/ctrip/xpipe/service/migration/CRedisService.java b/services/ctrip-service/src/main/java/com/ctrip/xpipe/service/migration/CRedisService.java index 87a0bb6b7..70610d4af 100644 --- a/services/ctrip-service/src/main/java/com/ctrip/xpipe/service/migration/CRedisService.java +++ b/services/ctrip-service/src/main/java/com/ctrip/xpipe/service/migration/CRedisService.java @@ -13,7 +13,6 @@ import com.ctrip.xpipe.migration.AbstractOuterClientService; import com.ctrip.xpipe.monitor.CatEventMonitor; import com.ctrip.xpipe.monitor.CatTransactionMonitor; -import com.ctrip.xpipe.service.beacon.data.BeaconResp; import com.ctrip.xpipe.spring.RestTemplateFactory; import com.ctrip.xpipe.utils.DateTimeUtils; import com.ctrip.xpipe.utils.StringUtil; @@ -211,8 +210,8 @@ public void go() throws Exception { String address = CREDIS_SERVICE.BATCH_SWITCH_STATUS.getRealPath(credisConfig.getCredisServiceAddress()); String reqType = "batchMarkInstance"; - MarkInstanceResponse response = doRequest(reqType, markInstanceRequest.getClusterName(), () -> restOperations.postForObject(address, markInstanceRequest, MarkInstanceResponse.class) - ); + MarkInstanceResponse response = doRequest(reqType, markInstanceRequest.getClusterName(), + () -> restOperations.postForObject(address, markInstanceRequest, MarkInstanceResponse.class)); logger.info("[doBatchMarkInstance][end]{},{}", markInstanceRequest, response); if(!response.isSuccess()){ throw new IllegalStateException(String.format("%s, response:%s", markInstanceRequest, response));