Skip to content

Commit

Permalink
执行迁移计划流程控制优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed May 21, 2024
1 parent a6c5302 commit 5a6ada8
Show file tree
Hide file tree
Showing 27 changed files with 454 additions and 342 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void onAction(KeeperInfoStatsActionContext context) {
long keeperFlow = extractor.getKeeperInstantaneousInputKbps().longValue();
deleteKeeper(info);
Map<DcClusterShardKeeper, Long> keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new);
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow);
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.isKeeperActive(), info.getHostPort().getPort()), keeperFlow);
} catch (Throwable throwable) {
logger.error("get instantaneous input kbps of keeper:{} error: ", context.instance().getCheckInfo().getHostPort(), throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,25 @@ public List<MigrationKeeperContainerDetailModel> getOverloadKeeperContainerMigra

@RequestMapping(value = "/keepercontainer/overload/migration/begin", method = RequestMethod.POST)
public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels) {
logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels);
try {
keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
if (!keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels)) {
return RetMessage.createFailMessage("[beginToMigrateOverloadKeeperContainers][fail] has unfinished migration task!");
}
} catch (Throwable th) {
logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage());
logger.warn("[beginToMigrateOverloadKeeperContainers][fail] {}", keeperContainerDetailModels, th);
return RetMessage.createFailMessage(th.getMessage());
}
return RetMessage.createSuccessMessage();
}

@RequestMapping(value = "/keepercontainer/overload/migration/terminate", method = RequestMethod.POST)
public RetMessage migrateKeeperTaskTerminate() {
if(keeperContainerMigrationService.stopMigrate()){
return RetMessage.createSuccessMessage();
}
return RetMessage.createFailMessage("Migrate tasks has finished");
}

@RequestMapping(value = "/keepercontainer/overload/info/lasted", method = RequestMethod.GET)
public List<KeeperContainerUsedInfoModel> getLastedAllReadyMigrateKeeperContainers() {
return analyzer.getAllDcKeeperContainerUsedInfoModelsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) {
.getAllShardModel(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName());

for (ShardModel shardModel : allShardModel) {
if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(),
if (!shardModelService.migrateBackupKeeper(model.getSrcKeeperContainer().getDcName(),
clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(),
(model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null)
? null : model.getTargetKeeperContainer().getAddr().getHost())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt

private final List<ALERT_TYPE> alertType = Lists.newArrayList(ALERT_TYPE.KEEPER_MIGRATION_FAIL, ALERT_TYPE.KEEPER_MIGRATION_SUCCESS);

public final static String KEEPER_MIGRATION = "keeper_migration";

public final static String KEEPER_MIGRATION_SUCCESS = "keeper_migration_success";

public final static String KEEPER_MIGRATION_FAIL = "keeper_migration_fail";
Expand All @@ -47,10 +49,6 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt

public final static String KEEPER_SWITCH_MASTER_FAIL = "keeper_switch_master_fail";

public final static String KEEPER_MIGRATION_ACTIVE_START_SUCCESS = "keeper_migration_active_start_success";

public final static String KEEPER_MIGRATION_ACTIVE_START_FAIL = "keeper_migration_active_start_fail";

public final static String KEEPER_MIGRATION_ACTIVE_SUCCESS = "keeper_migration_active_success";

public final static String KEEPER_MIGRATION_ACTIVE_FAIL = "keeper_migration_active_fail";
Expand Down Expand Up @@ -87,7 +85,7 @@ void migrateAllKeepers(List<MigrationKeeperContainerDetailModel> readyToMigratio
ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(),
migrateShard.getClusterId(), migrateShard.getShardId(), false, null);

if (!shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
if (!shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
migrationKeeperContainerDetailModel.getTargetKeeperContainer().getKeeperIp(), srcKeeperContainerIp)) {
logger.warn("[migrateAllKeepers] migrate shard keepers failed, shard: {}", migrateShard);
alertForKeeperMigrationFail(migrateShard, srcKeeperContainerIp,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.framework.xpipe.redis.ProxyRegistry;
import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.command.CommandFuture;
import com.ctrip.xpipe.api.command.CommandFutureListener;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.pool.SimpleObjectPool;
import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,37 +29,9 @@ protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool,
this.scheduled = scheduled;
}

protected InfoCommand generteInfoCommand(Endpoint key) {
if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) {
commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI;
}
protected InfoCommand generateInfoReplicationCommand(Endpoint key) {
SimpleObjectPool<NettyClient> keyPool = keyedObjectPool.getKeyPool(key);
return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut);
}

protected <V> void addHookAndExecute(Command<V> command, Callbackable<V> callback) {
logger.info("[zyfTest][addHookAndExecute] start execute");
CommandFuture<V> future = command.execute();
logger.info("[zyfTest][addHookAndExecute] start addListener");
future.addListener(new CommandFutureListener<V>() {
@Override
public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
if(!commandFuture.isSuccess()) {
logger.info("[zyfTest][addHookAndExecute] listener fail");
callback.fail(commandFuture.cause());
} else {
logger.info("[zyfTest][addHookAndExecute] listener success");
callback.success(commandFuture.get());
}
}
});
try {
logger.info("[zyfTest][addHookAndExecute] before get");
future.get();
logger.info("[zyfTest][addHookAndExecute] get over");
} catch (Exception e){
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
import com.ctrip.xpipe.utils.VisibleForTesting;

import java.util.concurrent.ScheduledExecutorService;

public class CheckKeeperActiveCommand<T> extends AbstractKeeperCommand<T>{

private Endpoint keeper;

private boolean expectedActive;

public CheckKeeperActiveCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper, boolean expectedActive) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
this.expectedActive = expectedActive;
}

@Override
public String getName() {
return "CheckKeeperActiveCommand";
}

@Override
protected void doExecute() throws Throwable {
InfoCommand infoCommand = generateInfoReplicationCommand(keeper);
if (new InfoResultExtractor(infoCommand.execute().get()).isKeeperActive() == expectedActive) {
this.future().setSuccess();
}
this.future().setFailure(new Exception(String.format("keeper: %s is not %s", keeper, expectedActive)));
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand;
import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole;

import java.util.concurrent.ScheduledExecutorService;

import static com.ctrip.xpipe.redis.core.protocal.MASTER_STATE.REDIS_REPL_CONNECTED;

public class CheckKeeperConnectedCommand<T> extends AbstractKeeperCommand<T> {

private Endpoint keeper;

public CheckKeeperConnectedCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
}

@Override
public String getName() {
return "CheckKeeperConnectedCommand";
}

@Override
protected void doExecute() throws Throwable {
SlaveRole role = (SlaveRole)new RoleCommand(keyedObjectPool.getKeyPool(keeper), scheduled).execute().get();
if (REDIS_REPL_CONNECTED == role.getMasterState()) {
this.future().setSuccess();
}
this.future().setFailure(new Exception(String.format("ping %s has no pong response", keeper)));
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.pool.ObjectPoolException;
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
import com.ctrip.xpipe.command.RetryCommandFactory;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class FullSyncJudgeCommand<T> extends AbstractKeeperCommand<T> {

private Endpoint active;
private Endpoint activeInstance;

private Endpoint backUp;

private long intervalTime;
private Endpoint backUpInstance;

private long activeMasterReplOffset;

private long backupMasterReplOffset;

public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) {
public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint activeInstance, Endpoint backUpInstance, long activeMasterReplOffset) {
super(keyedObjectPool, scheduled);
this.active = active;
this.backUp = backUp;
this.intervalTime = intervalTime;
this.activeInstance = activeInstance;
this.backUpInstance = backUpInstance;
this.activeMasterReplOffset = activeMasterReplOffset;
}

@Override
Expand All @@ -37,51 +28,15 @@ public String getName() {

@Override
protected void doExecute() throws Throwable {
try {
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000);
Command<String> activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active));
Command<String> backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp));
addHookAndExecute(activeRetryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
}

@Override
public void fail(Throwable throwable) {
logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable);
}
});

try {
Thread.sleep(intervalTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

addHookAndExecute(backUpRetryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
}

@Override
public void fail(Throwable throwable) {
logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable);
}
});
long backupMasterReplOffset;
backupMasterReplOffset = new InfoResultExtractor(generateInfoReplicationCommand(backUpInstance).execute().get()).getMasterReplOffset();

if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) {
this.future().setSuccess();
}
} finally {
try {
keyedObjectPool.clear(active);
keyedObjectPool.clear(backUp);
} catch (ObjectPoolException e) {
logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e);
}
logger.debug("[FullSyncJudgeCommand] activeMasterReplOffset: {}:{}, backupMasterReplOffset: {}:{}",
activeInstance, activeMasterReplOffset, backUpInstance, backupMasterReplOffset);
if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) {
this.future().setSuccess();
}
this.future().setFailure(new Exception(String.format("activeInstance: %s and backUpInstance %s is not full sync", activeInstance, backUpInstance)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class KeeperContainerReplOffsetGetCommand<V> extends AbstractKeeperCommand<java.lang.Long>{

private Endpoint keeper;

public KeeperContainerReplOffsetGetCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
}

@Override
public String getName() {
return "KeeperContainerReplOffsetGetCommand";
}

@Override
protected void doExecute() throws Throwable {
this.future().setSuccess(new InfoResultExtractor(generateInfoReplicationCommand(keeper).execute().get()).getMasterReplOffset());
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.redis.console.service.KeeperContainerService;

public class KeeperResetCommand<T> extends AbstractCommand<T> {

private String activeKeeperIp;

private long shardId;

private KeeperContainerService keeperContainerService;

public KeeperResetCommand(String activeKeeperIp, long shardId, KeeperContainerService keeperContainerService) {
this.activeKeeperIp = activeKeeperIp;
this.shardId = shardId;
this.keeperContainerService = keeperContainerService;
}

@Override
public String getName() {
return "KeeperResetCommand";
}

@Override
protected void doExecute() throws Throwable {
keeperContainerService.resetKeepers(activeKeeperIp, shardId);
this.future().setSuccess();
}

@Override
protected void doReset() {

}
}
Loading

0 comments on commit 5a6ada8

Please sign in to comment.