From 3bfc03be3693b0de2684d866f0203a56d367ad9a Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Fri, 29 Mar 2024 14:41:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E5=BC=82=E5=B8=B8case?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E6=97=A5=E5=BF=97=EF=BC=8C=E9=99=90=E5=88=B6?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=B1=A0=E7=BA=BF=E7=A8=8B=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../redis/checker/resource/Resource.java | 2 + .../model/impl/ShardModelServiceImpl.java | 93 +++++++++++-------- .../redis/console/spring/ResourceConfig.java | 10 ++ 3 files changed, 64 insertions(+), 41 deletions(-) diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java index f16d65397..51a3e4da4 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java @@ -12,6 +12,8 @@ public class Resource { public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool"; + public static final String MIGRATE_KEEPER_CLIENT_POOL = "migrateKeeperClientPool"; + public static final String PING_DELAY_INFO_EXECUTORS = "pingDelayInfoExecutors"; public static final String PING_DELAY_INFO_SCHEDULED = "pingDelayInfoScheduled"; diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java index ae46e90bf..8d29248e9 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java @@ -20,7 +20,6 @@ import com.ctrip.xpipe.redis.console.service.*; import com.ctrip.xpipe.redis.console.service.model.ShardModelService; import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperMeta; import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand; import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; @@ -40,8 +39,7 @@ import java.util.*; import java.util.concurrent.*; -import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_COMMAND_EXECUTOR; -import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_SESSION_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.*; import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.KEEPER_MIGRATION_ACTIVE_FAIL; import static com.ctrip.xpipe.redis.console.keeper.AutoMigrateOverloadKeeperContainerAction.KEEPER_MIGRATION_ACTIVE_SUCCESS; @@ -78,6 +76,14 @@ public class ShardModelServiceImpl implements ShardModelService{ private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(20); + @Resource(name = REDIS_COMMAND_EXECUTOR) + private ScheduledExecutorService scheduled; + + @Resource(name = MIGRATE_KEEPER_CLIENT_POOL) + private XpipeNettyClientKeyedObjectPool keyedObjectPool; + + private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI))); + private final long SWITCH_MASTER_CHECK_INTERVAL = 1000; private final int SWITCH_MASTER_CHECK_TIMES = 10; @@ -265,31 +271,45 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) { try { List keepers = shardModel.getKeepers(); - int srcKeeperPort = keepers.stream() - .filter(r -> r.getRedisIp().equals(activeIp)) - .findFirst() - .map(RedisTbl::getRedisPort) - .orElseThrow(() -> new RuntimeException("No source keeper found")); - - String targetKeeperIp = keepers.stream() - .filter(r -> !r.getRedisIp().equals(activeIp)) - .findFirst() - .map(RedisTbl::getRedisIp) - .orElseThrow(() -> new RuntimeException("No target keeper found")); - - if (!targetKeeperIp.equals(backupIp)) { + if (keepers.size() != 2) { + logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModel: {}", activeIp, backupIp, shardModel); return false; } + int activeKeeperPort = -1; + String backUpKeeperIp = null; + for (RedisTbl keeper : keepers) { + if (keeper.getRedisIp().equals(activeIp)) { + activeKeeperPort = keeper.getRedisPort(); + } else { + backUpKeeperIp = keeper.getRedisIp(); + } + } - KeeperTransMeta keeperInstanceMeta = keeperContainerService.getAllKeepers(activeIp).stream() - .filter(k -> k.getKeeperMeta().getPort() == srcKeeperPort) - .findFirst() - .orElseThrow(() -> new RuntimeException("No keeper instance found")); + if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) { + logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModel: {}, activeKeeperPort: {}, backUpKeeperIp: {}" + , activeIp, backupIp, shardModel, activeKeeperPort, backUpKeeperIp); + return false; + } + + KeeperTransMeta keeperInstanceMeta = null; + List allKeepers = keeperContainerService.getAllKeepers(activeIp); + for (KeeperInstanceMeta keeper : allKeepers) { + if (keeper.getKeeperMeta().getPort() == activeKeeperPort) { + keeperInstanceMeta = keeper; + break; + } + } + + if (keeperInstanceMeta == null) { + logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort); + return false; + } keeperContainerService.resetKeepers(keeperInstanceMeta); - return checkKeeperActive(activeIp, srcKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES); + return checkKeeperActive(activeIp, activeKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES); } catch (Exception e) { + logger.error("[switchMaster] switch master failed", e); return false; } } @@ -310,39 +330,25 @@ public void success(String message) { @Override public void fail(Throwable throwable) { - logger.error("[switchMaster] ", throwable); + logger.error("[switchMaster] keeper: {}:{}", ip, port, throwable); } }); if (isMaster[0] == expectActive) break; Thread.sleep(interval); } - return !expectActive ^ isMaster[0]; + return isMaster[0] == expectActive; } catch (Exception e) { - logger.error("[switchMaster] check keeper active error", e); + logger.error("[switchMaster] check keeper active error, keeper: {}:{}", ip, port, e); return false; } finally { try { keyedObjectPool.clear(activeKey); } catch (ObjectPoolException e) { - logger.error("[clear] clear keyed object pool error", e); + logger.error("[clear] clear keyed object pool error, keeper: {}:{}", ip, port, e); } } } - @Resource(name = REDIS_COMMAND_EXECUTOR) - private ScheduledExecutorService scheduled; - - @Resource(name = REDIS_SESSION_NETTY_CLIENT_POOL) - private XpipeNettyClientKeyedObjectPool keyedObjectPool; - - private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI))); - - @VisibleForTesting - public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) { - this.keyedObjectPool = pool; - } - - @VisibleForTesting public InfoCommand generteInfoCommand(Endpoint key) { if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) { commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI; @@ -420,6 +426,7 @@ private boolean doMigrateKeepers(String dcName, String clusterName, ShardModel s protected class FullSyncJudgeTask implements Runnable{ private String activeIp; + private String backUpIp; private final InfoCommand activeInfoCommand; private final InfoCommand backupInfoCommand; @@ -433,7 +440,6 @@ protected class FullSyncJudgeTask implements Runnable{ private ShardModel shardModel; private long startTime = 0; private ScheduledFuture scheduledFuture; - public FullSyncJudgeTask(String activeIp, String backUpIp, InfoCommand activeInfoCommand, InfoCommand backupInfoCommand, long expireTime, long intervalTime, String dcName, String clusterName, ShardModel shardModel) { this.activeIp = activeIp; @@ -512,6 +518,7 @@ public void setBackupMasterReplOffset(long offset) { this.backupMasterReplOffset = offset; } + } private void addHookAndExecute(AbstractRedisCommand command, Callbackable callback) { silentCommand(command); @@ -532,7 +539,6 @@ public void operationComplete(CommandFuture commandFuture) throws Exception { throw new RuntimeException(e); } } - private void silentCommand(LoggableRedisCommand command) { command.logRequest(false); command.logResponse(false); @@ -544,5 +550,10 @@ public void setExecutor(ScheduledThreadPoolExecutor executor) { this.executor = executor; } + @VisibleForTesting + public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) { + this.keyedObjectPool = pool; + } + } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java index 605e11494..6c802fa63 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java @@ -26,6 +26,8 @@ public class ResourceConfig extends AbstractRedisConfigContext { private final static int KEYED_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("KEYED_CLIENT_POOL_SIZE", "8")); + private final static int MIGRATE_KEEPER_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("MIGRATE_KEEPER_CLIENT_POOL_SIZE", "1")); + @Bean(name = REDIS_COMMAND_EXECUTOR) public ScheduledExecutorService getRedisCommandExecutor() { int corePoolSize = OsUtils.getCpuCount(); @@ -54,6 +56,14 @@ public XpipeNettyClientKeyedObjectPool getRedisSessionNettyClientPool() throws E return keyedObjectPool; } + @Bean(name = MIGRATE_KEEPER_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getMigrateKeeperClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(MIGRATE_KEEPER_CLIENT_POOL_SIZE)); + LifecycleHelper.initializeIfPossible(keyedObjectPool); + LifecycleHelper.startIfPossible(keyedObjectPool); + return keyedObjectPool; + } + @Bean(name = PING_DELAY_INFO_EXECUTORS) public ExecutorService getDelayPingExecturos() { return DefaultExecutorFactory.createAllowCoreTimeoutAbortPolicy("RedisHealthCheckInstance-").createExecutorService();