Skip to content

Commit

Permalink
Merge pull request #784 from ctripcorp/feature/rebalance-keeper
Browse files Browse the repository at this point in the history
Feature/rebalance keeper
  • Loading branch information
LanternLee authored May 23, 2024
2 parents fde05d5 + e7bd0c2 commit 08b055e
Show file tree
Hide file tree
Showing 43 changed files with 844 additions and 389 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private long getUsedMemory(InfoResultExtractor extractor) {
Long usedMemory = extractor.getUsedMemory();
Long dbSize = extractor.getSwapUsedDbSize();

if (dbSize == null) return usedMemory;
if (dbSize == null || usedMemory < maxMemory) return usedMemory;

String keysSpaceDb0 = extractor.extract("db0");
if (StringUtil.isEmpty(keysSpaceDb0)) return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void onAction(KeeperInfoStatsActionContext context) {
long keeperFlow = extractor.getKeeperInstantaneousInputKbps().longValue();
deleteKeeper(info);
Map<DcClusterShardKeeper, Long> keeperContainerResult = MapUtils.getOrCreate(hostPort2InputFlow, info.getHostPort().getHost(), ConcurrentHashMap::new);
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.getKeeperActive(), info.getHostPort().getPort()), keeperFlow);
keeperContainerResult.put(new DcClusterShardKeeper(info.getDcId(), info.getClusterId(), info.getShardId(), extractor.isKeeperActive(), info.getHostPort().getPort()), keeperFlow);
} catch (Throwable throwable) {
logger.error("get instantaneous input kbps of keeper:{} error: ", context.instance().getCheckInfo().getHostPort(), throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class Resource {

public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool";

public static final String MIGRATE_KEEPER_CLIENT_POOL = "migrateKeeperClientPool";

public static final String PING_DELAY_INFO_EXECUTORS = "pingDelayInfoExecutors";

public static final String PING_DELAY_INFO_SCHEDULED = "pingDelayInfoScheduled";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.console.controller.consoleportal;

import com.ctrip.xpipe.redis.checker.controller.result.RetMessage;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
import com.ctrip.xpipe.redis.console.keeper.KeeperContainerUsedInfoAnalyzer;
Expand Down Expand Up @@ -92,16 +93,25 @@ public List<MigrationKeeperContainerDetailModel> getOverloadKeeperContainerMigra

@RequestMapping(value = "/keepercontainer/overload/migration/begin", method = RequestMethod.POST)
public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels) {
logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels);
try {
// keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
if (!keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels)) {
return RetMessage.createFailMessage("The previous migration tasks are still in progress!");
}
} catch (Throwable th) {
logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage());
logger.warn("[beginToMigrateOverloadKeeperContainers][fail] {}", keeperContainerDetailModels, th);
return RetMessage.createFailMessage(th.getMessage());
}
return RetMessage.createSuccessMessage();
}

@RequestMapping(value = "/keepercontainer/overload/migration/terminate", method = RequestMethod.POST)
public RetMessage migrateKeeperTaskTerminate() {
if(keeperContainerMigrationService.stopMigrate()){
return RetMessage.createSuccessMessage("All migration tasks have been completed");
}
return RetMessage.createSuccessMessage("No migration tasks in progress");
}

@RequestMapping(value = "/keepercontainer/overload/info/lasted", method = RequestMethod.GET)
public List<KeeperContainerUsedInfoModel> getLastedAllReadyMigrateKeeperContainers() {
return analyzer.getAllDcKeeperContainerUsedInfoModelsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) {
.getAllShardModel(model.getSrcKeeperContainer().getDcName(), clusterTbl.getClusterName());

for (ShardModel shardModel : allShardModel) {
if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(),
if (!shardModelService.migrateBackupKeeper(model.getSrcKeeperContainer().getDcName(),
clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(),
(model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null)
? null : model.getTargetKeeperContainer().getAddr().getHost())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt

private final List<ALERT_TYPE> alertType = Lists.newArrayList(ALERT_TYPE.KEEPER_MIGRATION_FAIL, ALERT_TYPE.KEEPER_MIGRATION_SUCCESS);

public final static String KEEPER_MIGRATION = "keeper_migration";

public final static String KEEPER_MIGRATION_SUCCESS = "keeper_migration_success";

public final static String KEEPER_MIGRATION_FAIL = "keeper_migration_fail";
Expand All @@ -47,14 +49,12 @@ public class AutoMigrateOverloadKeeperContainerAction extends AbstractCrossDcInt

public final static String KEEPER_SWITCH_MASTER_FAIL = "keeper_switch_master_fail";

public final static String KEEPER_MIGRATION_ACTIVE_START_SUCCESS = "keeper_migration_active_start_success";

public final static String KEEPER_MIGRATION_ACTIVE_START_FAIL = "keeper_migration_active_start_fail";

public final static String KEEPER_MIGRATION_ACTIVE_SUCCESS = "keeper_migration_active_success";

public final static String KEEPER_MIGRATION_ACTIVE_FAIL = "keeper_migration_active_fail";

public final static String KEEPER_MIGRATION_ACTIVE_ROLLBACK_ERROR = "keeper_migration_active_rollback_error";

public final static String KEEPER_MIGRATION_BACKUP_SUCCESS = "keeper_migration_backup_success";

public final static String KEEPER_MIGRATION_BACKUP_FAIL = "keeper_migration_backup_fail";
Expand Down Expand Up @@ -87,7 +87,7 @@ void migrateAllKeepers(List<MigrationKeeperContainerDetailModel> readyToMigratio
ShardModel shardModel = shardModelService.getShardModel(migrateShard.getDcId(),
migrateShard.getClusterId(), migrateShard.getShardId(), false, null);

if (!shardModelService.migrateShardKeepers(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
if (!shardModelService.migrateBackupKeeper(migrateShard.getDcId(), migrateShard.getClusterId(), shardModel,
migrationKeeperContainerDetailModel.getTargetKeeperContainer().getKeeperIp(), srcKeeperContainerIp)) {
logger.warn("[migrateAllKeepers] migrate shard keepers failed, shard: {}", migrateShard);
alertForKeeperMigrationFail(migrateShard, srcKeeperContainerIp,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.framework.xpipe.redis.ProxyRegistry;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.api.pool.SimpleObjectPool;
import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledExecutorService;

public abstract class AbstractKeeperCommand<V> extends AbstractCommand<V> {

protected XpipeNettyClientKeyedObjectPool keyedObjectPool;

protected ScheduledExecutorService scheduled;

protected static final Logger logger = LoggerFactory.getLogger(AbstractKeeperCommand.class);

private int commandTimeOut = Integer.parseInt(System.getProperty("KEY_REDISSESSION_COMMAND_TIMEOUT", String.valueOf(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI)));

protected AbstractKeeperCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled) {
this.keyedObjectPool = keyedObjectPool;
this.scheduled = scheduled;
}

protected InfoCommand generateInfoReplicationCommand(Endpoint key) {
SimpleObjectPool<NettyClient> keyPool = keyedObjectPool.getKeyPool(key);
return new InfoCommand(keyPool, InfoCommand.INFO_TYPE.REPLICATION.cmd(), scheduled, commandTimeOut);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoCommand;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
import com.ctrip.xpipe.utils.VisibleForTesting;

import java.util.concurrent.ScheduledExecutorService;

public class CheckKeeperActiveCommand<T> extends AbstractKeeperCommand<T>{

private Endpoint keeper;

private boolean expectedActive;

public CheckKeeperActiveCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper, boolean expectedActive) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
this.expectedActive = expectedActive;
}

@Override
public String getName() {
return "CheckKeeperActiveCommand";
}

@Override
protected void doExecute() throws Throwable {
InfoCommand infoCommand = generateInfoReplicationCommand(keeper);
if (new InfoResultExtractor(infoCommand.execute().get()).isKeeperActive() == expectedActive) {
this.future().setSuccess();
return;
}
this.future().setFailure(new Exception(String.format("keeper: %s is not %s", keeper, expectedActive)));
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand;
import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole;

import java.util.concurrent.ScheduledExecutorService;

import static com.ctrip.xpipe.redis.core.protocal.MASTER_STATE.REDIS_REPL_CONNECTED;

public class CheckKeeperConnectedCommand<T> extends AbstractKeeperCommand<T> {

private Endpoint keeper;

public CheckKeeperConnectedCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
}

@Override
public String getName() {
return "CheckKeeperConnectedCommand";
}

@Override
protected void doExecute() throws Throwable {
SlaveRole role = (SlaveRole)new RoleCommand(keyedObjectPool.getKeyPool(keeper), scheduled).execute().get();
if (REDIS_REPL_CONNECTED == role.getMasterState()) {
this.future().setSuccess();
return;
}
this.future().setFailure(new Exception(String.format("ping %s has no pong response", keeper)));
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class FullSyncJudgeCommand<T> extends AbstractKeeperCommand<T> {

private Endpoint activeInstance;

private Endpoint backUpInstance;

private long activeMasterReplOffset;

public FullSyncJudgeCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint activeInstance, Endpoint backUpInstance, long activeMasterReplOffset) {
super(keyedObjectPool, scheduled);
this.activeInstance = activeInstance;
this.backUpInstance = backUpInstance;
this.activeMasterReplOffset = activeMasterReplOffset;
}

@Override
public String getName() {
return "FullSyncJudgeCommand";
}

@Override
protected void doExecute() throws Throwable {
long backupMasterReplOffset;
backupMasterReplOffset = new InfoResultExtractor(generateInfoReplicationCommand(backUpInstance).execute().get()).getMasterReplOffset();
if (backupMasterReplOffset > 0 && activeMasterReplOffset > 0 && backupMasterReplOffset > activeMasterReplOffset) {
this.future().setSuccess();
return;
}
this.future().setFailure(new Exception(String.format("activeInstance: %s and backUpInstance %s is not full sync", activeInstance, backUpInstance)));
}

@Override
protected void doReset() {

}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

import java.util.concurrent.ScheduledExecutorService;

public class KeeperContainerReplOffsetGetCommand<V> extends AbstractKeeperCommand<Object>{

private Endpoint keeper;

public KeeperContainerReplOffsetGetCommand(XpipeNettyClientKeyedObjectPool keyedObjectPool, ScheduledExecutorService scheduled, Endpoint keeper) {
super(keyedObjectPool, scheduled);
this.keeper = keeper;
}

@Override
public String getName() {
return "KeeperContainerReplOffsetGetCommand";
}

@Override
protected void doExecute() throws Throwable {
this.future().setSuccess(new InfoResultExtractor(generateInfoReplicationCommand(keeper).execute().get()).getMasterReplOffset());
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.redis.console.service.KeeperContainerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeeperResetCommand<T> extends AbstractCommand<T> {

private String activeKeeperIp;

private long shardId;

private KeeperContainerService keeperContainerService;

public KeeperResetCommand(String activeKeeperIp, long shardId, KeeperContainerService keeperContainerService) {
this.activeKeeperIp = activeKeeperIp;
this.shardId = shardId;
this.keeperContainerService = keeperContainerService;
}

@Override
public String getName() {
return "KeeperResetCommand";
}

@Override
protected void doExecute() throws Throwable {
keeperContainerService.resetKeeper(activeKeeperIp, shardId);
this.future().setSuccess();
}

@Override
protected void doReset() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ public class IPPairData {
private long inputFlow;
private long peerData;

private Map<DcClusterShardKeeper, KeeperUsedInfo> entryMap = new HashMap<>();

public IPPairData() {
}

public void removeDcClusterShard(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> migrateDcClusterShard) {
this.inputFlow -= migrateDcClusterShard.getValue().getInputFlow();
this.peerData -= migrateDcClusterShard.getValue().getPeerData();
this.entryMap.remove(migrateDcClusterShard.getKey());
}

public void addDcClusterShard(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> migrateDcClusterShard) {
this.inputFlow += migrateDcClusterShard.getValue().getInputFlow();
this.peerData += migrateDcClusterShard.getValue().getPeerData();
this.entryMap.put(migrateDcClusterShard.getKey(), migrateDcClusterShard.getValue());
}

public long getInputFlow() {
Expand All @@ -31,4 +35,7 @@ public long getPeerData() {
return peerData;
}

public Map<DcClusterShardKeeper, KeeperUsedInfo> getEntryMap() {
return entryMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ public boolean canMigrate(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInf
return handler.handle(keeperUsedInfoEntry);
}

public boolean isKeeperPairOverload(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel keeperContainer1,
KeeperContainerUsedInfoModel keeperContainer2,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil) {
public boolean isMigrateKeeperPairOverload(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel keeperContainer1,
KeeperContainerUsedInfoModel keeperContainer2,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil) {
if (keeperContainer1.getKeeperIp().equals(keeperContainer2.getKeeperIp())) {
return true;
}
Expand Down
Loading

0 comments on commit 08b055e

Please sign in to comment.