Skip to content

Commit

Permalink
Merge pull request #781 from ctripcorp/feature/rebalance-keeper
Browse files Browse the repository at this point in the history
Feature/rebalance keeper
  • Loading branch information
LanternLee authored Mar 26, 2024
2 parents 01d77da + 8886bb0 commit 93d7c85
Show file tree
Hide file tree
Showing 27 changed files with 759 additions and 457 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), port);
return Objects.hash(super.hashCode(), active);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ public class KeeperContainerUsedInfoModel {

private String org;

private String az;

private Date updateTime;

private long activeInputFlow;

private long totalInputFlow;
Expand Down Expand Up @@ -54,6 +58,7 @@ public KeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model, Map.Entr
this.keeperIp = model.getKeeperIp();
this.dcName = model.getDcName();
this.org = model.getOrg();
this.az = model.getAz();
this.activeInputFlow = model.getActiveInputFlow() + dcClusterShard.getValue().getInputFlow();
this.totalInputFlow = model.getTotalInputFlow() + dcClusterShard.getValue().getInputFlow();
this.inputFlowStandard = model.getInputFlowStandard();
Expand All @@ -76,6 +81,8 @@ public static KeeperContainerUsedInfoModel cloneKeeperContainerUsedInfoModel(Kee
newModel.setKeeperIp(model.getKeeperIp());
newModel.setDcName(model.getDcName());
newModel.setOrg(model.getOrg());
newModel.setAz(model.getAz());
newModel.setUpdateTime(model.getUpdateTime());
newModel.setActiveInputFlow(model.getActiveInputFlow());
newModel.setTotalInputFlow(model.getTotalInputFlow());
newModel.setInputFlowStandard(model.getInputFlowStandard());
Expand Down Expand Up @@ -128,8 +135,27 @@ public String getOrg() {
return org;
}

public void setOrg(String org) {
public KeeperContainerUsedInfoModel setOrg(String org) {
this.org = org;
return this;
}

public String getAz() {
return az;
}

public KeeperContainerUsedInfoModel setAz(String az) {
this.az = az;
return this;
}

public Date getUpdateTime() {
return updateTime;
}

public KeeperContainerUsedInfoModel setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
return this;
}

public long getActiveInputFlow() {
Expand Down Expand Up @@ -262,6 +288,8 @@ public String toString() {
"keeperIp='" + keeperIp + '\'' +
", dcName='" + dcName + '\'' +
", org='" + org + '\'' +
", az='" + az + '\'' +
", updateTime=" + updateTime +
", activeInputFlow=" + activeInputFlow +
", totalInputFlow=" + totalInputFlow +
", inputFlowStandard=" + inputFlowStandard +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

@RestController
Expand Down Expand Up @@ -86,7 +88,7 @@ public List<MigrationKeeperContainerDetailModel> getOverloadKeeperContainerMigra
public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels) {
logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels);
try {
keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
// keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
} catch (Throwable th) {
logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage());
return RetMessage.createFailMessage(th.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ public enum KeeperContainerOverloadCause {
BOTH,
KEEPER_PAIR_PEER_DATA_OVERLOAD,
KEEPER_PAIR_INPUT_FLOW_OVERLOAD,
KEEPER_PAIR_BOTH;
KEEPER_PAIR_BOTH,
RESOURCE_LACK,
PAIR_RESOURCE_LACK,;

public static KeeperContainerOverloadCause findByValue(String value) {
if(StringUtil.isEmpty(value)) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@
public class IPPairData {
private long inputFlow;
private long peerData;
private final Map<DcClusterShardKeeper, KeeperUsedInfo> keeperUsedInfoMap = new HashMap<>();

public IPPairData() {
}

public void removeDcClusterShard(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> migrateDcClusterShard) {
this.inputFlow -= migrateDcClusterShard.getValue().getInputFlow();
this.peerData -= migrateDcClusterShard.getValue().getPeerData();
keeperUsedInfoMap.remove(migrateDcClusterShard.getKey());
}

public void addDcClusterShard(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> migrateDcClusterShard) {
this.inputFlow += migrateDcClusterShard.getValue().getInputFlow();
this.peerData += migrateDcClusterShard.getValue().getPeerData();
keeperUsedInfoMap.put(migrateDcClusterShard.getKey(), migrateDcClusterShard.getValue());
}

public long getInputFlow() {
Expand All @@ -34,8 +31,4 @@ public long getPeerData() {
return peerData;
}

public Map<DcClusterShardKeeper, KeeperUsedInfo> getKeeperUsedInfoMap() {
return keeperUsedInfoMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData;
import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerContext;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -16,30 +17,46 @@ public class KeeperContainerFilterChain {
@Autowired
private ConsoleConfig config;

public boolean doKeeperContainerFilter(KeeperContainerUsedInfoModel targetContainer){
public boolean isKeeperContainerUseful(KeeperContainerUsedInfoModel targetContainer){
Handler<KeeperContainerUsedInfoModel> handler = new HostActiveHandler();
handler.setNextHandler(new HostDiskOverloadHandler(config))
.setNextHandler(new KeeperContainerOverloadHandler());
return handler.handle(targetContainer);
}

public boolean doKeeperFilter(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel srcKeeperContainer,
KeeperContainerUsedInfoModel targetKeeperContainer,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil){
public boolean canMigrate(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel srcKeeperContainerPair,
KeeperContainerUsedInfoModel targetKeeperContainer,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil){
Handler<Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo>> handler = new KeeperDataOverloadHandler(targetKeeperContainer);
handler.setNextHandler(new KeeperPairOverloadHandler(analyzerUtil, srcKeeperContainer, targetKeeperContainer, config));
handler.setNextHandler(new KeeperPairOverloadHandler(analyzerUtil, srcKeeperContainerPair, targetKeeperContainer, config));
return handler.handle(keeperUsedInfoEntry);
}

public boolean doKeeperPairFilter(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel keeperContainer1,
KeeperContainerUsedInfoModel keeperContainer2,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil) {
return new KeeperPairOverloadHandler(analyzerUtil, keeperContainer1, keeperContainer2, config)
public boolean isKeeperPairOverload(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel keeperContainer1,
KeeperContainerUsedInfoModel keeperContainer2,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil) {
if (keeperContainer1.getKeeperIp().equals(keeperContainer2.getKeeperIp())) {
return true;
}
return !new KeeperPairOverloadHandler(analyzerUtil, keeperContainer1, keeperContainer2, config)
.handle(keeperUsedInfoEntry);
}

public boolean isKeeperContainerPairOverload(KeeperContainerUsedInfoModel keeperContainer1,
KeeperContainerUsedInfoModel keeperContainer2,
IPPairData ipPairData) {
if (keeperContainer1.getKeeperIp().equals(keeperContainer2.getKeeperIp())) {
return true;
}
return new KeeperContainerPairOverloadHandler(keeperContainer1, keeperContainer2, config, ipPairData).handle(null);
}

public boolean isDataOverLoad(KeeperContainerUsedInfoModel model) {
return !new KeeperContainerOverloadHandler().handle(model);
}

@VisibleForTesting
public void setConfig(ConsoleConfig config){
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.ctrip.xpipe.redis.console.keeper.handler;

import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData;
import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel;

public class KeeperContainerPairOverloadHandler extends AbstractHandler<Object>{

private KeeperContainerUsedInfoModel pairA;

private KeeperContainerUsedInfoModel pairB;

private ConsoleConfig config;

private IPPairData ipPairData;

public KeeperContainerPairOverloadHandler(KeeperContainerUsedInfoModel pairA, KeeperContainerUsedInfoModel pairB, ConsoleConfig config, IPPairData ipPairData) {
this.pairA = pairA;
this.pairB = pairB;
this.config = config;
this.ipPairData = ipPairData;
}

@Override
protected boolean doNextHandler(Object o) {
double keeperPairOverLoadFactor = config.getKeeperPairOverLoadFactor();
KeeperContainerOverloadStandardModel minStandardModel = new KeeperContainerOverloadStandardModel()
.setFlowOverload((long) (Math.min(pairB.getInputFlowStandard(), pairA.getInputFlowStandard()) * keeperPairOverLoadFactor))
.setPeerDataOverload((long) (Math.min(pairB.getRedisUsedMemoryStandard(), pairA.getRedisUsedMemoryStandard()) * keeperPairOverLoadFactor));
long overloadInputFlow = ipPairData.getInputFlow() - minStandardModel.getFlowOverload();
long overloadPeerData = ipPairData.getPeerData() - minStandardModel.getPeerDataOverload();
return overloadInputFlow > 0 || overloadPeerData > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData;
import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerContext;
import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel;

import java.util.Map;

public class KeeperPairOverloadHandler extends AbstractHandler<Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo>>{

private KeeperContainerUsedInfoAnalyzerContext analyzerUtil;
private KeeperContainerUsedInfoAnalyzerContext analyzerContext;

private KeeperContainerUsedInfoModel keeperContainer1;

Expand All @@ -20,20 +21,23 @@ public class KeeperPairOverloadHandler extends AbstractHandler<Map.Entry<DcClust
private ConsoleConfig config;

public KeeperPairOverloadHandler(KeeperContainerUsedInfoAnalyzerContext analyzerUtil, KeeperContainerUsedInfoModel keeperContainer1, KeeperContainerUsedInfoModel keeperContainer2, ConsoleConfig config) {
this.analyzerUtil = analyzerUtil;
this.analyzerContext = analyzerUtil;
this.keeperContainer1 = keeperContainer1;
this.keeperContainer2 = keeperContainer2;
this.config = config;
}

@Override
protected boolean doNextHandler(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> keeperUsedInfoEntry) {
IPPairData longLongPair = analyzerUtil.getIPPairData(keeperContainer1.getKeeperIp(), keeperContainer2.getKeeperIp());
if (longLongPair == null) return true;
double keeperPairOverLoadFactor = config.getKeeperPairOverLoadFactor();
double flowStandard = Math.min(keeperContainer1.getInputFlowStandard(), keeperContainer2.getInputFlowStandard()) * keeperPairOverLoadFactor;
double peerDataStandard = Math.min(keeperContainer1.getRedisUsedMemoryStandard(), keeperContainer2.getRedisUsedMemoryStandard()) * keeperPairOverLoadFactor;
return longLongPair.getInputFlow() + keeperUsedInfoEntry.getValue().getInputFlow() < flowStandard &&
longLongPair.getPeerData() + keeperUsedInfoEntry.getValue().getPeerData() < peerDataStandard;
IPPairData longLongPair = analyzerContext.getIPPairData(keeperContainer1.getKeeperIp(), keeperContainer2.getKeeperIp());
if (longLongPair == null) return true;
KeeperContainerOverloadStandardModel minStandardModel = new KeeperContainerOverloadStandardModel()
.setFlowOverload((long) (Math.min(keeperContainer1.getInputFlowStandard(), keeperContainer2.getInputFlowStandard()) * keeperPairOverLoadFactor))
.setPeerDataOverload((long) (Math.min(keeperContainer1.getRedisUsedMemoryStandard(), keeperContainer2.getRedisUsedMemoryStandard()) * keeperPairOverLoadFactor));

long overloadInputFlow = longLongPair.getInputFlow() + keeperUsedInfoEntry.getValue().getInputFlow() - minStandardModel.getFlowOverload();
long overloadPeerData = longLongPair.getPeerData() + keeperUsedInfoEntry.getValue().getPeerData() - minStandardModel.getPeerDataOverload();
return overloadPeerData < 0 && overloadInputFlow < 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.ctrip.xpipe.redis.console.keeper.impl;

public class DefaultKeeperContainerAvailablePool {
}
Loading

0 comments on commit 93d7c85

Please sign in to comment.