From 97953ddbd873e9e54a975c525dddc97483d10a78 Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Tue, 2 Apr 2024 19:24:01 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E8=BF=81=E7=A7=BB=E8=AE=A1?= =?UTF-8?q?=E5=88=92command=E7=BB=93=E6=9E=84=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../keeper/Command/AbstractKeeperCommand.java | 69 +++++ .../keeper/Command/FullSyncJudgeCommand.java | 94 ++++++ .../keeper/Command/SwitchMasterCommand.java | 115 ++++++++ .../service/KeeperAdvancedService.java | 2 - .../impl/DefaultKeeperAdvancedService.java | 11 +- ...efaultKeeperContainerMigrationService.java | 3 + .../model/impl/ShardModelServiceImpl.java | 276 ++---------------- .../service/ShardModelServiceTest.java | 25 +- 8 files changed, 312 insertions(+), 283 deletions(-) create mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java create mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java create mode 100644 redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java new file mode 100644 index 000000000..341d5835f --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/AbstractKeeperCommand.java @@ -0,0 +1,69 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.framework.xpipe.redis.ProxyRegistry; +import com.ctrip.xpipe.api.command.Command; +import com.ctrip.xpipe.api.command.CommandFuture; +import com.ctrip.xpipe.api.command.CommandFutureListener; +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.pool.SimpleObjectPool; +import com.ctrip.xpipe.command.AbstractCommand; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; +import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand; +import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ScheduledExecutorService; + +public abstract class AbstractKeeperCommand extends AbstractCommand { + + protected XpipeNettyClientKeyedObjectPool keyedObjectPool; + + protected ScheduledExecutorService scheduled; + + protected static final Logger logger = LoggerFactory.getLogger(AbstractKeeperCommand.class); + + private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI))); + + protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled) { + this.keyedObjectPool = keyedObjectPool; + this.scheduled = scheduled; + } + + protected InfoCommand generteInfoCommand(Endpoint key) { + if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) { + commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI; + } + SimpleObjectPool keyPool = keyedObjectPool.getKeyPool(key); + return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut); + } + + protected void addHookAndExecute(Command command, Callbackable callback) { + logger.info("[zyfTest][addHookAndExecute] start execute"); + CommandFuture future = command.execute(); + logger.info("[zyfTest][addHookAndExecute] start addListener"); + future.addListener(new CommandFutureListener() { + @Override + public void operationComplete(CommandFuture commandFuture) throws Exception { + if(!commandFuture.isSuccess()) { + logger.info("[zyfTest][addHookAndExecute] listener fail"); + callback.fail(commandFuture.cause()); + } else { + logger.info("[zyfTest][addHookAndExecute] listener success"); + callback.success(commandFuture.get()); + } + } + }); + try { + logger.info("[zyfTest][addHookAndExecute] before get"); + future.get(); + logger.info("[zyfTest][addHookAndExecute] get over"); + } catch (Exception e){ + throw new RuntimeException(e); + } + } + +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java new file mode 100644 index 000000000..01f540717 --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/FullSyncJudgeCommand.java @@ -0,0 +1,94 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.command.Command; +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.pool.ObjectPoolException; +import com.ctrip.xpipe.command.DefaultRetryCommandFactory; +import com.ctrip.xpipe.command.RetryCommandFactory; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; + +import java.util.concurrent.ScheduledExecutorService; + +public class FullSyncJudgeCommand extends AbstractKeeperCommand { + + private Endpoint active; + + private Endpoint backUp; + + private long intervalTime; + + private long activeMasterReplOffset; + + private long backupMasterReplOffset; + + public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) { + super(keyedObjectPool, scheduled); + this.active = active; + this.backUp = backUp; + this.intervalTime = intervalTime; + } + + @Override + public String getName() { + return "FullSyncJudgeCommand"; + } + + @Override + protected void doExecute() throws Throwable { + try { + RetryCommandFactory commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000); + Command activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active)); + Command backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp)); + addHookAndExecute(activeRetryInfoCommand, new Callbackable() { + @Override + public void success(String message) { + activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); + } + + @Override + public void fail(Throwable throwable) { + logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable); + } + }); + + try { + Thread.sleep(intervalTime); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + addHookAndExecute(backUpRetryInfoCommand, new Callbackable() { + @Override + public void success(String message) { + backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); + } + + @Override + public void fail(Throwable throwable) { + logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable); + } + }); + + if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) { + this.future().setSuccess(); + } + } finally { + try { + keyedObjectPool.clear(active); + keyedObjectPool.clear(backUp); + } catch (ObjectPoolException e) { + logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e); + } + } + } + + @Override + protected void doReset() { + + } + + + +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java new file mode 100644 index 000000000..a2478a9a2 --- /dev/null +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/Command/SwitchMasterCommand.java @@ -0,0 +1,115 @@ +package com.ctrip.xpipe.redis.console.keeper.Command; + +import com.ctrip.xpipe.api.command.Command; +import com.ctrip.xpipe.command.DefaultRetryCommandFactory; +import com.ctrip.xpipe.command.RetryCommandFactory; +import com.ctrip.xpipe.endpoint.DefaultEndPoint; +import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable; +import com.ctrip.xpipe.redis.console.model.RedisTbl; +import com.ctrip.xpipe.redis.console.service.KeeperContainerService; +import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; +import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; +import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +public class SwitchMasterCommand extends AbstractKeeperCommand{ + + private String activeIp; + + private String backupIp; + + private List keepers; + + private KeeperContainerService keeperContainerService; + + public SwitchMasterCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, String activeIp, String backupIp, List keepers, KeeperContainerService keeperContainerService) { + super(keyedObjectPool, scheduled); + this.activeIp = activeIp; + this.backupIp = backupIp; + this.keepers = keepers; + this.keeperContainerService = keeperContainerService; + } + + @Override + public String getName() { + return "SwitchMasterCommand"; + } + + @Override + protected void doExecute() throws Throwable { + try { + logger.info("[zyfTest][SwitchMasterCommand] start"); + if (keepers.size() != 2) { + logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModelKeepers: {}", activeIp, backupIp, keepers); + return; + } + int activeKeeperPort = -1; + String backUpKeeperIp = null; + for (RedisTbl keeper : keepers) { + if (keeper.getRedisIp().equals(activeIp)) { + activeKeeperPort = keeper.getRedisPort(); + } else { + backUpKeeperIp = keeper.getRedisIp(); + } + } + + if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) { + logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModelKeepers: {}, activeKeeperPort: {}, backUpKeeperIp: {}" + , activeIp, backupIp, keepers, activeKeeperPort, backUpKeeperIp); + return; + } + + KeeperTransMeta keeperInstanceMeta = null; + logger.info("[zyfTest][SwitchMasterCommand] start getAllKeepers"); + List allKeepers = keeperContainerService.getAllKeepers(activeIp); + logger.info("[zyfTest][SwitchMasterCommand] over getAllKeepers"); + for (KeeperInstanceMeta keeper : allKeepers) { + if (keeper.getKeeperMeta().getPort() == activeKeeperPort) { + keeperInstanceMeta = keeper; + break; + } + } + + if (keeperInstanceMeta == null) { + logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort); + return; + } + logger.info("[zyfTest][SwitchMasterCommand] start resetKeepers"); + keeperContainerService.resetKeepers(keeperInstanceMeta); + logger.info("[zyfTest][SwitchMasterCommand] over resetKeepers"); + RetryCommandFactory commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000); + Command retryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(new DefaultEndPoint(activeIp, activeKeeperPort))); + logger.info("[zyfTest][SwitchMasterCommand] get retryInfoCommand"); + int finalActiveKeeperPort = activeKeeperPort; + addHookAndExecute(retryInfoCommand, new Callbackable() { + @Override + public void success(String message) { + logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand success"); + if (!new InfoResultExtractor(message).getKeeperActive()) { + future().setSuccess(); + } + } + + @Override + public void fail(Throwable throwable) { + logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand fail"); + logger.error("[SwitchMasterCommand] info keeper: {}:{}", activeIp, finalActiveKeeperPort, throwable); + } + }); + if (retryInfoCommand.future().isSuccess()) { + future().setSuccess(); + logger.info("[zyfTest][SwitchMasterCommand] over success"); + } + } catch (Exception e) { + logger.error("[SwitchMasterCommand] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e); + } + } + + @Override + protected void doReset() { + + } +} diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperAdvancedService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperAdvancedService.java index 664fec2f7..14f4170ee 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperAdvancedService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/KeeperAdvancedService.java @@ -20,8 +20,6 @@ List findBestKeepers(String dcName, int beginPort, BiPredicate< List getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp); - List getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp, boolean isAutoRebalance); - List getSwitchMaterNewKeepers(ShardModel shardModel); List findBestKeepersByKeeperContainer(String targetKeeperContainerIp, int beginPort, diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperAdvancedService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperAdvancedService.java index fd48b562e..000bed110 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperAdvancedService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperAdvancedService.java @@ -72,18 +72,10 @@ public List findBestKeepers(String dcName, int beginPort, BiPre @Override public List getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { - return getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp, false); - } - - @Override - public List getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp, boolean isAutoRebalance) { List newKeepers = new ArrayList<>(); logger.debug("[migrateKeepers] origin keepers {} from cluster:{}, dc:{}, shard:{}",shardModel.getKeepers(), clusterName, dcName, shardModel.getShardTbl().getShardName()); for (RedisTbl keeper : shardModel.getKeepers()) { if (!ObjectUtils.equals(keeper.getRedisIp(), srcKeeperContainerIp)) { - if (isAutoRebalance) { - keeper.setMaster(true); - } newKeepers.add(keeper); } } @@ -110,8 +102,7 @@ && isDifferentAz(keeperSelected, alreadyUsedAzId, dcName)) { newKeepers.add(new RedisTbl().setKeepercontainerId(keeperSelected.getKeeperContainerId()) .setRedisIp(keeperSelected.getHost()) .setRedisPort(keeperSelected.getPort()) - .setRedisRole(XPipeConsoleConstant.ROLE_KEEPER) - .setMaster(!newKeepers.get(0).isMaster())); + .setRedisRole(XPipeConsoleConstant.ROLE_KEEPER)); break; } } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java index 6ce24aed4..1f116dde4 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultKeeperContainerMigrationService.java @@ -57,10 +57,13 @@ public void beginMigrateKeeperContainers(List switchMasterCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 3, 1000); - private final long KEEPER_BALANCE_FULL_SYNC_EXPIRE_TIME = 10L * 60 * 1000; - - private final long KEEPER_BALANCE_FULL_SYNC_INTERVAL_TIME = 1000; + private RetryCommandFactory fullSyncCommandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000); @Override public List getAllShardModel(String dcName, String clusterName) { @@ -269,99 +264,24 @@ public boolean migrateShardKeepers(String dcName, String clusterName, ShardModel @Override public boolean switchMaster(String activeIp, String backupIp, ShardModel shardModel) { + Command switchMasterCommand = switchMasterCommandFactory.createRetryCommand(new SwitchMasterCommand<>(keyedObjectPool, scheduled, activeIp, backupIp, shardModel.getKeepers(), keeperContainerService)); try { - List keepers = shardModel.getKeepers(); - if (keepers.size() != 2) { - logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModel: {}", activeIp, backupIp, shardModel); - return false; - } - int activeKeeperPort = -1; - String backUpKeeperIp = null; - for (RedisTbl keeper : keepers) { - if (keeper.getRedisIp().equals(activeIp)) { - activeKeeperPort = keeper.getRedisPort(); - } else { - backUpKeeperIp = keeper.getRedisIp(); - } - } - - if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) { - logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModel: {}, activeKeeperPort: {}, backUpKeeperIp: {}" - , activeIp, backupIp, shardModel, activeKeeperPort, backUpKeeperIp); - return false; - } - - KeeperTransMeta keeperInstanceMeta = null; - List allKeepers = keeperContainerService.getAllKeepers(activeIp); - for (KeeperInstanceMeta keeper : allKeepers) { - if (keeper.getKeeperMeta().getPort() == activeKeeperPort) { - keeperInstanceMeta = keeper; - break; - } - } - - if (keeperInstanceMeta == null) { - logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort); - return false; - } - - keeperContainerService.resetKeepers(keeperInstanceMeta); - return checkKeeperActive(activeIp, activeKeeperPort, false, SWITCH_MASTER_CHECK_INTERVAL, SWITCH_MASTER_CHECK_TIMES); - + logger.info("[zyfTest] start switchMasterCommand execute"); + switchMasterCommand.execute().get(); + logger.info("[zyfTest] start switchMasterCommand execute over"); + logger.info("[zyfTest] start switchMasterCommand execute success?:{}",switchMasterCommand.future().isSuccess()); + return switchMasterCommand.future().isSuccess(); } catch (Exception e) { - logger.error("[switchMaster] switch master failed", e); + logger.error("[switchMaster] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e); return false; } } - private boolean checkKeeperActive(String ip, int port, boolean expectActive, long interval, int maxRetryTimes) { - DefaultEndPoint activeKey = new DefaultEndPoint(ip, port); - InfoCommand infoCommand = generteInfoCommand(activeKey); - final boolean[] isMaster = new boolean[1]; - try { - int time = 0; - while (time < maxRetryTimes){ - time ++; - addHookAndExecute(infoCommand, new Callbackable() { - @Override - public void success(String message) { - isMaster[0] = new InfoResultExtractor(message).getKeeperActive(); - } - - @Override - public void fail(Throwable throwable) { - logger.error("[switchMaster] keeper: {}:{}", ip, port, throwable); - } - }); - if (isMaster[0] == expectActive) break; - Thread.sleep(interval); - } - return isMaster[0] == expectActive; - } catch (Exception e) { - logger.error("[switchMaster] check keeper active error, keeper: {}:{}", ip, port, e); - return false; - } finally { - try { - keyedObjectPool.clear(activeKey); - } catch (ObjectPoolException e) { - logger.error("[clear] clear keyed object pool error, keeper: {}:{}", ip, port, e); - } - } - } - - public InfoCommand generteInfoCommand(Endpoint key) { - if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) { - commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI; - } - SimpleObjectPool keyPool = keyedObjectPool.getKeyPool(key); - return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut); - } - @Override public boolean migrateAutoBalanceKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) { List oldKeepers = shardModel.getKeepers(); List newKeepers = keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, - srcKeeperContainerIp, targetKeeperContainerIp, true); + srcKeeperContainerIp, targetKeeperContainerIp); if (!doMigrateKeepers(dcName, clusterName, shardModel, newKeepers)) { throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel)); } @@ -369,36 +289,25 @@ public boolean migrateAutoBalanceKeepers(String dcName, String clusterName, Shar RedisTbl backup = newKeepers.get(1); DefaultEndPoint activeKey = new DefaultEndPoint(active.getRedisIp(), active.getRedisPort()); DefaultEndPoint backupKey = new DefaultEndPoint(backup.getRedisIp(), backup.getRedisPort()); - InfoCommand activeInfoCommand = generteInfoCommand(activeKey); - InfoCommand backupInfoCommand = generteInfoCommand(backupKey); - - FullSyncJudgeTask task = new FullSyncJudgeTask(active.getRedisIp(), backup.getRedisIp(), activeInfoCommand, backupInfoCommand, KEEPER_BALANCE_FULL_SYNC_EXPIRE_TIME, KEEPER_BALANCE_FULL_SYNC_INTERVAL_TIME, - dcName, clusterName, shardModel); + Command fullSyncJudgeRetryCommand = fullSyncCommandFactory.createRetryCommand(new FullSyncJudgeCommand<>(keyedObjectPool, scheduled, activeKey, backupKey, 1000)); + Command switchmasterCommand = switchMasterCommandFactory.createRetryCommand(new SwitchMasterCommand<>(keyedObjectPool, scheduled, activeKey.getHost(), backupKey.getHost(), newKeepers, keeperContainerService)); + SequenceCommandChain chain = new SequenceCommandChain(false, false); + chain.add(fullSyncJudgeRetryCommand); + chain.add(switchmasterCommand); try { - ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(task, 0,1000, TimeUnit.MILLISECONDS); - task.setScheduledFuture(scheduledFuture); - scheduledFuture.get(); - return getAutoBalanceResult(task, dcName, clusterName, shardModel, oldKeepers); - } catch (Exception e) { - logger.error("[clear] clear keyed object pool error", e); - return getAutoBalanceResult(task, dcName, clusterName, shardModel, oldKeepers); - } finally { - try { - keyedObjectPool.clear(activeKey); - keyedObjectPool.clear(backupKey); - } catch (ObjectPoolException e) { - logger.error("[clear] clear keyed object pool error", e); - } + chain.execute().get(); + return getAutoBalanceResult(chain.future().isSuccess(), dcName, clusterName, shardModel, oldKeepers); + } catch (InterruptedException | ExecutionException e) { + logger.error("[fullSyncJudge] execute fullSyncJudgeRetryCommand fail", e); + return getAutoBalanceResult(chain.future().isSuccess(), dcName, clusterName, shardModel, oldKeepers); } } - private boolean getAutoBalanceResult(FullSyncJudgeTask task, String dcName, String clusterName, ShardModel shardModel, List oldKeepers) { - if (task.getResult()) { + private boolean getAutoBalanceResult(boolean taskSuccess, String dcName, String clusterName, ShardModel shardModel, List oldKeepers) { + if (taskSuccess) { return true; } - if (!doMigrateKeepers(dcName, clusterName, shardModel, oldKeepers)) { - throw new RuntimeException(String.format("migrate auto balance Keepers fail dc:%s, cluster:%s, shard:%S", dcName, clusterName, shardModel)); - } + doMigrateKeepers(dcName, clusterName, shardModel, oldKeepers); return false; } @@ -423,133 +332,6 @@ private boolean doMigrateKeepers(String dcName, String clusterName, ShardModel s } } - protected class FullSyncJudgeTask implements Runnable{ - - private String activeIp; - - private String backUpIp; - private final InfoCommand activeInfoCommand; - private final InfoCommand backupInfoCommand; - private long activeMasterReplOffset; - private long backupMasterReplOffset; - private final long expireTime; - private final long intervalTime; - private boolean isSuccess; - private String dcName; - private String clusterName; - private ShardModel shardModel; - private long startTime = 0; - private ScheduledFuture scheduledFuture; - public FullSyncJudgeTask(String activeIp, String backUpIp, InfoCommand activeInfoCommand, InfoCommand backupInfoCommand, long expireTime, long intervalTime, - String dcName, String clusterName, ShardModel shardModel) { - this.activeIp = activeIp; - this.backUpIp = backUpIp; - this.activeInfoCommand = activeInfoCommand; - this.backupInfoCommand = backupInfoCommand; - this.expireTime = expireTime; - this.intervalTime = intervalTime; - this.dcName = dcName; - this.clusterName = clusterName; - this.shardModel = shardModel; - } - - public void setScheduledFuture(ScheduledFuture scheduledFuture) { - this.scheduledFuture = scheduledFuture; - } - - @Override - public void run() { - if (startTime == 0) { - startTime = System.currentTimeMillis(); - } - addHookAndExecute(activeInfoCommand, new Callbackable() { - @Override - public void success(String message) { - activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); - } - - @Override - public void fail(Throwable throwable) { - backupMasterReplOffset = -1; - } - }); - - try { - Thread.sleep(intervalTime); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - addHookAndExecute(backupInfoCommand, new Callbackable() { - @Override - public void success(String message) { - backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset(); - } - - @Override - public void fail(Throwable throwable) { - backupMasterReplOffset = -1; - } - }); - if (backupMasterReplOffset > activeMasterReplOffset) { - isSuccess = true; - switchMaster(activeIp, backUpIp, shardModel); - CatEventMonitor.DEFAULT.logEvent(KEEPER_MIGRATION_ACTIVE_SUCCESS, - String.format("activeKeeper:%s, backupKeeper:%s, dc:%s, cluster:%s, shard:%s", - activeInfoCommand, backupInfoCommand, dcName, clusterName, shardModel.getShardTbl().getShardName())); - scheduledFuture.cancel(true); - Thread.currentThread().interrupt(); - } - if (System.currentTimeMillis() - startTime > expireTime && !isSuccess) { - CatEventMonitor.DEFAULT.logEvent(KEEPER_MIGRATION_ACTIVE_FAIL, - String.format("activeKeeper:%s, backupKeeper:%s, dc:%s, cluster:%s, shard:%s", - activeInfoCommand, backupInfoCommand, dcName, clusterName, shardModel.getShardTbl().getShardName())); - scheduledFuture.cancel(true); - } - - } - - public boolean getResult() { - return isSuccess; - } - - @VisibleForTesting - public void setBackupMasterReplOffset(long offset) { - this.backupMasterReplOffset = offset; - } - - - } - private void addHookAndExecute(AbstractRedisCommand command, Callbackable callback) { - silentCommand(command); - CommandFuture future = command.execute(); - future.addListener(new CommandFutureListener() { - @Override - public void operationComplete(CommandFuture commandFuture) throws Exception { - if(!commandFuture.isSuccess()) { - callback.fail(commandFuture.cause()); - } else { - callback.success(commandFuture.get()); - } - } - }); - try { - future.get(); - } catch (Exception e){ - throw new RuntimeException(e); - } - } - private void silentCommand(LoggableRedisCommand command) { - command.logRequest(false); - command.logResponse(false); - - } - - @VisibleForTesting - public void setExecutor(ScheduledThreadPoolExecutor executor) { - this.executor = executor; - } - @VisibleForTesting public void setKeyedObjectPool(XpipeNettyClientKeyedObjectPool pool) { this.keyedObjectPool = pool; diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java index 7fb25f935..fb0e58500 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/service/ShardModelServiceTest.java @@ -62,14 +62,13 @@ public void initMockData() { List newKeepers = new ArrayList<>(); newKeepers.add(new RedisTbl().setRedisIp("ip1").setRedisPort(6380)); newKeepers.add(new RedisTbl().setRedisIp("ip2").setRedisPort(6381)); - when(keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcIp, targetIp, true)).thenReturn(newKeepers); + when(keeperAdvancedService.getNewKeepers(dcName, clusterName, shardModel, srcIp, targetIp)).thenReturn(newKeepers); shardModelService.setKeyedObjectPool(new XpipeNettyClientKeyedObjectPool()); } @Test public void testMigrateAutoBalanceKeepers() throws Exception { ScheduledThreadPoolExecutor executor = Mockito.mock(ScheduledThreadPoolExecutor.class); - shardModelService.setExecutor(executor); ScheduledFuture future = Mockito.mock(ScheduledFuture.class); Mockito.when(executor.scheduleWithFixedDelay(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.anyLong(), Mockito.any(TimeUnit.class))).thenReturn(future); Mockito.when(future.get()).thenReturn(null); @@ -80,28 +79,6 @@ public void testMigrateAutoBalanceKeepers() throws Exception { } } - @Test - public void testFullSyncJudgeTask() throws Exception { - InfoCommand infoCommand1 = Mockito.mock(InfoCommand.class); - InfoCommand infoCommand2 = Mockito.mock(InfoCommand.class); - DefaultCommandFuture future1 = Mockito.mock(DefaultCommandFuture.class); - DefaultCommandFuture future2 = Mockito.mock(DefaultCommandFuture.class); - Mockito.when(infoCommand1.execute()).thenReturn(future1); - Mockito.when(infoCommand2.execute()).thenReturn(future2); - Mockito.when(future1.get()).thenReturn(null); - Mockito.when(future2.get()).thenReturn(null); - FullSyncJudgeTask task = new FullSyncJudgeTask("1", "2", infoCommand1, infoCommand2, 1000, 1000, dcName, clusterName, shardModel); - ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(task, 1000, 1000, TimeUnit.MILLISECONDS); - task.setScheduledFuture(scheduledFuture); - task.run(); - Assert.assertFalse(task.getResult()); - task.setBackupMasterReplOffset(10L); - task.run(); - Assert.assertTrue(task.getResult()); - } - - @Test public void testGetSwitchMaterNewKeepers() { ShardModel model = new ShardModel();