Skip to content

Commit

Permalink
执行迁移计划command结构优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Apr 2, 2024
1 parent 97cb610 commit 97953dd
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 283 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.framework.xpipe.redis.ProxyRegistry;
import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.command.CommandFuture;
import com.ctrip.xpipe.api.command.CommandFutureListener;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.pool.SimpleObjectPool;
import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.core.protocal.LoggableRedisCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledExecutorService;

public abstract class AbstractKeeperCommand<V> extends AbstractCommand<V> {

protected XpipeNettyClientKeyedObjectPool keyedObjectPool;

protected ScheduledExecutorService scheduled;

protected static final Logger logger = LoggerFactory.getLogger(AbstractKeeperCommand.class);

private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI)));

protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled) {
this.keyedObjectPool = keyedObjectPool;
this.scheduled = scheduled;
}

protected InfoCommand generteInfoCommand(Endpoint key) {
if(ProxyRegistry.getProxy(key.getHost(), key.getPort()) != null) {
commandTimeOut = AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI;
}
SimpleObjectPool<NettyClient> keyPool = keyedObjectPool.getKeyPool(key);
return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut);
}

protected <V> void addHookAndExecute(Command<V> command, Callbackable<V> callback) {
logger.info("[zyfTest][addHookAndExecute] start execute");
CommandFuture<V> future = command.execute();
logger.info("[zyfTest][addHookAndExecute] start addListener");
future.addListener(new CommandFutureListener<V>() {
@Override
public void operationComplete(CommandFuture<V> commandFuture) throws Exception {
if(!commandFuture.isSuccess()) {
logger.info("[zyfTest][addHookAndExecute] listener fail");
callback.fail(commandFuture.cause());
} else {
logger.info("[zyfTest][addHookAndExecute] listener success");
callback.success(commandFuture.get());
}
}
});
try {
logger.info("[zyfTest][addHookAndExecute] before get");
future.get();
logger.info("[zyfTest][addHookAndExecute] get over");
} catch (Exception e){
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.pool.ObjectPoolException;
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
import com.ctrip.xpipe.command.RetryCommandFactory;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class FullSyncJudgeCommand<T> extends AbstractKeeperCommand<T> {

private Endpoint active;

private Endpoint backUp;

private long intervalTime;

private long activeMasterReplOffset;

private long backupMasterReplOffset;

public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint active, Endpoint backUp, long intervalTime) {
super(keyedObjectPool, scheduled);
this.active = active;
this.backUp = backUp;
this.intervalTime = intervalTime;
}

@Override
public String getName() {
return "FullSyncJudgeCommand";
}

@Override
protected void doExecute() throws Throwable {
try {
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 600, 1000);
Command<String> activeRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(active));
Command<String> backUpRetryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(backUp));
addHookAndExecute(activeRetryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
activeMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
}

@Override
public void fail(Throwable throwable) {
logger.error("[doExecute] info instance {}:{} failed", active.getHost(), active.getPort(), throwable);
}
});

try {
Thread.sleep(intervalTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

addHookAndExecute(backUpRetryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
backupMasterReplOffset = new InfoResultExtractor(message).getMasterReplOffset();
}

@Override
public void fail(Throwable throwable) {
logger.error("[doExecute] info instance {}:{} failed", backUp.getHost(), backUp.getPort(), throwable);
}
});

if (backupMasterReplOffset != 0 && activeMasterReplOffset != 0 && backupMasterReplOffset > activeMasterReplOffset) {
this.future().setSuccess();
}
} finally {
try {
keyedObjectPool.clear(active);
keyedObjectPool.clear(backUp);
} catch (ObjectPoolException e) {
logger.error("[clear] clear keyed object pool error, activeInstance:{}, backUpInstance:{}", active, backUp, e);
}
}
}

@Override
protected void doReset() {

}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.command.Command;
import com.ctrip.xpipe.command.DefaultRetryCommandFactory;
import com.ctrip.xpipe.command.RetryCommandFactory;
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;
import com.ctrip.xpipe.redis.console.model.RedisTbl;
import com.ctrip.xpipe.redis.console.service.KeeperContainerService;
import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

public class SwitchMasterCommand<T> extends AbstractKeeperCommand<T>{

private String activeIp;

private String backupIp;

private List<RedisTbl> keepers;

private KeeperContainerService keeperContainerService;

public SwitchMasterCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, String activeIp, String backupIp, List<RedisTbl> keepers, KeeperContainerService keeperContainerService) {
super(keyedObjectPool, scheduled);
this.activeIp = activeIp;
this.backupIp = backupIp;
this.keepers = keepers;
this.keeperContainerService = keeperContainerService;
}

@Override
public String getName() {
return "SwitchMasterCommand";
}

@Override
protected void doExecute() throws Throwable {
try {
logger.info("[zyfTest][SwitchMasterCommand] start");
if (keepers.size() != 2) {
logger.warn("[switchMaster] keeper size is not 2, can not switch master, activeIp: {}, backupIp: {}, shardModelKeepers: {}", activeIp, backupIp, keepers);
return;
}
int activeKeeperPort = -1;
String backUpKeeperIp = null;
for (RedisTbl keeper : keepers) {
if (keeper.getRedisIp().equals(activeIp)) {
activeKeeperPort = keeper.getRedisPort();
} else {
backUpKeeperIp = keeper.getRedisIp();
}
}

if (activeKeeperPort == -1 || backUpKeeperIp == null || !backUpKeeperIp.equals(backupIp)) {
logger.warn("[switchMaster] can not find truly active keeper or backup keeper, activeIp: {}, backupIp: {}, shardModelKeepers: {}, activeKeeperPort: {}, backUpKeeperIp: {}"
, activeIp, backupIp, keepers, activeKeeperPort, backUpKeeperIp);
return;
}

KeeperTransMeta keeperInstanceMeta = null;
logger.info("[zyfTest][SwitchMasterCommand] start getAllKeepers");
List<KeeperInstanceMeta> allKeepers = keeperContainerService.getAllKeepers(activeIp);
logger.info("[zyfTest][SwitchMasterCommand] over getAllKeepers");
for (KeeperInstanceMeta keeper : allKeepers) {
if (keeper.getKeeperMeta().getPort() == activeKeeperPort) {
keeperInstanceMeta = keeper;
break;
}
}

if (keeperInstanceMeta == null) {
logger.warn("[switchMaster] can not find keeper: {}:{} replId message", activeIp, activeKeeperPort);
return;
}
logger.info("[zyfTest][SwitchMasterCommand] start resetKeepers");
keeperContainerService.resetKeepers(keeperInstanceMeta);
logger.info("[zyfTest][SwitchMasterCommand] over resetKeepers");
RetryCommandFactory<String> commandFactory = DefaultRetryCommandFactory.retryNTimes(scheduled, 5, 1000);
Command<String> retryInfoCommand = commandFactory.createRetryCommand(generteInfoCommand(new DefaultEndPoint(activeIp, activeKeeperPort)));
logger.info("[zyfTest][SwitchMasterCommand] get retryInfoCommand");
int finalActiveKeeperPort = activeKeeperPort;
addHookAndExecute(retryInfoCommand, new Callbackable<String>() {
@Override
public void success(String message) {
logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand success");
if (!new InfoResultExtractor(message).getKeeperActive()) {
future().setSuccess();
}
}

@Override
public void fail(Throwable throwable) {
logger.info("[zyfTest][SwitchMasterCommand] retryInfoCommand fail");
logger.error("[SwitchMasterCommand] info keeper: {}:{}", activeIp, finalActiveKeeperPort, throwable);
}
});
if (retryInfoCommand.future().isSuccess()) {
future().setSuccess();
logger.info("[zyfTest][SwitchMasterCommand] over success");
}
} catch (Exception e) {
logger.error("[SwitchMasterCommand] switch master failed, activeIp: {}, backupIp: {}", activeIp, backupIp, e);
}
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ List<KeeperBasicInfo> findBestKeepers(String dcName, int beginPort, BiPredicate<

List<RedisTbl> getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp);

List<RedisTbl> getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp, boolean isAutoRebalance);

List<RedisTbl> getSwitchMaterNewKeepers(ShardModel shardModel);

List<KeeperBasicInfo> findBestKeepersByKeeperContainer(String targetKeeperContainerIp, int beginPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,10 @@ public List<KeeperBasicInfo> findBestKeepers(String dcName, int beginPort, BiPre

@Override
public List<RedisTbl> getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp) {
return getNewKeepers(dcName, clusterName, shardModel, srcKeeperContainerIp, targetKeeperContainerIp, false);
}

@Override
public List<RedisTbl> getNewKeepers(String dcName, String clusterName, ShardModel shardModel, String srcKeeperContainerIp, String targetKeeperContainerIp, boolean isAutoRebalance) {
List<RedisTbl> newKeepers = new ArrayList<>();
logger.debug("[migrateKeepers] origin keepers {} from cluster:{}, dc:{}, shard:{}",shardModel.getKeepers(), clusterName, dcName, shardModel.getShardTbl().getShardName());
for (RedisTbl keeper : shardModel.getKeepers()) {
if (!ObjectUtils.equals(keeper.getRedisIp(), srcKeeperContainerIp)) {
if (isAutoRebalance) {
keeper.setMaster(true);
}
newKeepers.add(keeper);
}
}
Expand All @@ -110,8 +102,7 @@ && isDifferentAz(keeperSelected, alreadyUsedAzId, dcName)) {
newKeepers.add(new RedisTbl().setKeepercontainerId(keeperSelected.getKeeperContainerId())
.setRedisIp(keeperSelected.getHost())
.setRedisPort(keeperSelected.getPort())
.setRedisRole(XPipeConsoleConstant.ROLE_KEEPER)
.setMaster(!newKeepers.get(0).isMaster()));
.setRedisRole(XPipeConsoleConstant.ROLE_KEEPER));
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ public void beginMigrateKeeperContainers(List<MigrationKeeperContainerDetailMode
migrateShard, srcKeeperContainerIp, targetKeeperContainerIp);
String event;
if (keeperContainer.isSwitchActive()) {
logger.info("[zyfTest] start switchMaster");
if (shardModelService.switchMaster(srcKeeperContainerIp, targetKeeperContainerIp, shardModel)) {
logger.info("[zyfTest] switchMaster success");
keeperContainer.migrateKeeperCompleteCountIncrease();
event = KEEPER_SWITCH_MASTER_SUCCESS;
} else {
logger.info("[zyfTest] switchMaster fail");
event = KEEPER_SWITCH_MASTER_FAIL;
}
}else if (keeperContainer.isKeeperPairOverload()) {
Expand Down
Loading

0 comments on commit 97953dd

Please sign in to comment.