Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/rebalance keeper #781

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), port);
return Objects.hash(super.hashCode(), active);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

private String org;

private String az;

private Date updateTime;

private long activeInputFlow;

private long totalInputFlow;
Expand Down Expand Up @@ -54,6 +58,7 @@
this.keeperIp = model.getKeeperIp();
this.dcName = model.getDcName();
this.org = model.getOrg();
this.az = model.getAz();

Check warning on line 61 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L61

Added line #L61 was not covered by tests
this.activeInputFlow = model.getActiveInputFlow() + dcClusterShard.getValue().getInputFlow();
this.totalInputFlow = model.getTotalInputFlow() + dcClusterShard.getValue().getInputFlow();
this.inputFlowStandard = model.getInputFlowStandard();
Expand All @@ -76,6 +81,8 @@
newModel.setKeeperIp(model.getKeeperIp());
newModel.setDcName(model.getDcName());
newModel.setOrg(model.getOrg());
newModel.setAz(model.getAz());
newModel.setUpdateTime(model.getUpdateTime());

Check warning on line 85 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L84-L85

Added lines #L84 - L85 were not covered by tests
newModel.setActiveInputFlow(model.getActiveInputFlow());
newModel.setTotalInputFlow(model.getTotalInputFlow());
newModel.setInputFlowStandard(model.getInputFlowStandard());
Expand Down Expand Up @@ -128,8 +135,27 @@
return org;
}

public void setOrg(String org) {
public KeeperContainerUsedInfoModel setOrg(String org) {
this.org = org;
return this;

Check warning on line 140 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L140

Added line #L140 was not covered by tests
}

public String getAz() {
return az;

Check warning on line 144 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L144

Added line #L144 was not covered by tests
}

public KeeperContainerUsedInfoModel setAz(String az) {
this.az = az;
return this;

Check warning on line 149 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L148-L149

Added lines #L148 - L149 were not covered by tests
}

public Date getUpdateTime() {
return updateTime;

Check warning on line 153 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L153

Added line #L153 was not covered by tests
}

public KeeperContainerUsedInfoModel setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
return this;

Check warning on line 158 in redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/model/KeeperContainerUsedInfoModel.java#L157-L158

Added lines #L157 - L158 were not covered by tests
}

public long getActiveInputFlow() {
Expand Down Expand Up @@ -262,6 +288,8 @@
"keeperIp='" + keeperIp + '\'' +
", dcName='" + dcName + '\'' +
", org='" + org + '\'' +
", az='" + az + '\'' +
", updateTime=" + updateTime +
", activeInputFlow=" + activeInputFlow +
", totalInputFlow=" + totalInputFlow +
", inputFlowStandard=" + inputFlowStandard +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

@RestController
Expand Down Expand Up @@ -86,7 +88,7 @@ public List<MigrationKeeperContainerDetailModel> getOverloadKeeperContainerMigra
public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List<MigrationKeeperContainerDetailModel> keeperContainerDetailModels) {
logger.info("begin to migrate over load keeper containers {}", keeperContainerDetailModels);
try {
keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
// keeperContainerMigrationService.beginMigrateKeeperContainers(keeperContainerDetailModels);
} catch (Throwable th) {
logger.warn("migrate over load keeper containers {} fail by {}", keeperContainerDetailModels, th.getMessage());
return RetMessage.createFailMessage(th.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ public enum KeeperContainerOverloadCause {
BOTH,
KEEPER_PAIR_PEER_DATA_OVERLOAD,
KEEPER_PAIR_INPUT_FLOW_OVERLOAD,
KEEPER_PAIR_BOTH;
KEEPER_PAIR_BOTH,
RESOURCE_LACK,
PAIR_RESOURCE_LACK,;

public static KeeperContainerOverloadCause findByValue(String value) {
if(StringUtil.isEmpty(value)) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@
public class IPPairData {
private long inputFlow;
private long peerData;
private final Map<DcClusterShardKeeper, KeeperUsedInfo> keeperUsedInfoMap = new HashMap<>();

public IPPairData() {
}

public void removeDcClusterShard(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> migrateDcClusterShard) {
this.inputFlow -= migrateDcClusterShard.getValue().getInputFlow();
this.peerData -= migrateDcClusterShard.getValue().getPeerData();
keeperUsedInfoMap.remove(migrateDcClusterShard.getKey());
}

public void addDcClusterShard(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> migrateDcClusterShard) {
this.inputFlow += migrateDcClusterShard.getValue().getInputFlow();
this.peerData += migrateDcClusterShard.getValue().getPeerData();
keeperUsedInfoMap.put(migrateDcClusterShard.getKey(), migrateDcClusterShard.getValue());
}

public long getInputFlow() {
Expand All @@ -34,8 +31,4 @@ public long getPeerData() {
return peerData;
}

public Map<DcClusterShardKeeper, KeeperUsedInfo> getKeeperUsedInfoMap() {
return keeperUsedInfoMap;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.ctrip.xpipe.redis.checker.model.DcClusterShardKeeper;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData;
import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerContext;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -16,30 +17,46 @@
@Autowired
private ConsoleConfig config;

public boolean doKeeperContainerFilter(KeeperContainerUsedInfoModel targetContainer){
public boolean isKeeperContainerUseful(KeeperContainerUsedInfoModel targetContainer){
Handler<KeeperContainerUsedInfoModel> handler = new HostActiveHandler();
handler.setNextHandler(new HostDiskOverloadHandler(config))
.setNextHandler(new KeeperContainerOverloadHandler());
return handler.handle(targetContainer);
}

public boolean doKeeperFilter(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel srcKeeperContainer,
KeeperContainerUsedInfoModel targetKeeperContainer,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil){
public boolean canMigrate(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel srcKeeperContainerPair,
KeeperContainerUsedInfoModel targetKeeperContainer,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil){
Handler<Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo>> handler = new KeeperDataOverloadHandler(targetKeeperContainer);
handler.setNextHandler(new KeeperPairOverloadHandler(analyzerUtil, srcKeeperContainer, targetKeeperContainer, config));
handler.setNextHandler(new KeeperPairOverloadHandler(analyzerUtil, srcKeeperContainerPair, targetKeeperContainer, config));
return handler.handle(keeperUsedInfoEntry);
}

public boolean doKeeperPairFilter(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel keeperContainer1,
KeeperContainerUsedInfoModel keeperContainer2,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil) {
return new KeeperPairOverloadHandler(analyzerUtil, keeperContainer1, keeperContainer2, config)
public boolean isKeeperPairOverload(Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo> keeperUsedInfoEntry,
KeeperContainerUsedInfoModel keeperContainer1,
KeeperContainerUsedInfoModel keeperContainer2,
KeeperContainerUsedInfoAnalyzerContext analyzerUtil) {
if (keeperContainer1.getKeeperIp().equals(keeperContainer2.getKeeperIp())) {
return true;

Check warning on line 41 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperContainerFilterChain.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperContainerFilterChain.java#L41

Added line #L41 was not covered by tests
}
return !new KeeperPairOverloadHandler(analyzerUtil, keeperContainer1, keeperContainer2, config)
.handle(keeperUsedInfoEntry);
}

public boolean isKeeperContainerPairOverload(KeeperContainerUsedInfoModel keeperContainer1,
KeeperContainerUsedInfoModel keeperContainer2,
IPPairData ipPairData) {
if (keeperContainer1.getKeeperIp().equals(keeperContainer2.getKeeperIp())) {
return true;

Check warning on line 51 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperContainerFilterChain.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/handler/KeeperContainerFilterChain.java#L51

Added line #L51 was not covered by tests
}
return new KeeperContainerPairOverloadHandler(keeperContainer1, keeperContainer2, config, ipPairData).handle(null);
}

public boolean isDataOverLoad(KeeperContainerUsedInfoModel model) {
return !new KeeperContainerOverloadHandler().handle(model);
}

@VisibleForTesting
public void setConfig(ConsoleConfig config){
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.ctrip.xpipe.redis.console.keeper.handler;

import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData;
import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel;

public class KeeperContainerPairOverloadHandler extends AbstractHandler<Object>{

private KeeperContainerUsedInfoModel pairA;

private KeeperContainerUsedInfoModel pairB;

private ConsoleConfig config;

private IPPairData ipPairData;

public KeeperContainerPairOverloadHandler(KeeperContainerUsedInfoModel pairA, KeeperContainerUsedInfoModel pairB, ConsoleConfig config, IPPairData ipPairData) {
this.pairA = pairA;
this.pairB = pairB;
this.config = config;
this.ipPairData = ipPairData;
}

@Override
protected boolean doNextHandler(Object o) {
double keeperPairOverLoadFactor = config.getKeeperPairOverLoadFactor();
KeeperContainerOverloadStandardModel minStandardModel = new KeeperContainerOverloadStandardModel()
.setFlowOverload((long) (Math.min(pairB.getInputFlowStandard(), pairA.getInputFlowStandard()) * keeperPairOverLoadFactor))
.setPeerDataOverload((long) (Math.min(pairB.getRedisUsedMemoryStandard(), pairA.getRedisUsedMemoryStandard()) * keeperPairOverLoadFactor));
long overloadInputFlow = ipPairData.getInputFlow() - minStandardModel.getFlowOverload();
long overloadPeerData = ipPairData.getPeerData() - minStandardModel.getPeerDataOverload();
return overloadInputFlow > 0 || overloadPeerData > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.keeper.entity.IPPairData;
import com.ctrip.xpipe.redis.console.keeper.util.KeeperContainerUsedInfoAnalyzerContext;
import com.ctrip.xpipe.redis.console.model.KeeperContainerOverloadStandardModel;

import java.util.Map;

public class KeeperPairOverloadHandler extends AbstractHandler<Map.Entry<DcClusterShardKeeper, KeeperContainerUsedInfoModel.KeeperUsedInfo>>{

private KeeperContainerUsedInfoAnalyzerContext analyzerUtil;
private KeeperContainerUsedInfoAnalyzerContext analyzerContext;

private KeeperContainerUsedInfoModel keeperContainer1;

Expand All @@ -20,20 +21,23 @@ public class KeeperPairOverloadHandler extends AbstractHandler<Map.Entry<DcClust
private ConsoleConfig config;

public KeeperPairOverloadHandler(KeeperContainerUsedInfoAnalyzerContext analyzerUtil, KeeperContainerUsedInfoModel keeperContainer1, KeeperContainerUsedInfoModel keeperContainer2, ConsoleConfig config) {
this.analyzerUtil = analyzerUtil;
this.analyzerContext = analyzerUtil;
this.keeperContainer1 = keeperContainer1;
this.keeperContainer2 = keeperContainer2;
this.config = config;
}

@Override
protected boolean doNextHandler(Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> keeperUsedInfoEntry) {
IPPairData longLongPair = analyzerUtil.getIPPairData(keeperContainer1.getKeeperIp(), keeperContainer2.getKeeperIp());
if (longLongPair == null) return true;
double keeperPairOverLoadFactor = config.getKeeperPairOverLoadFactor();
double flowStandard = Math.min(keeperContainer1.getInputFlowStandard(), keeperContainer2.getInputFlowStandard()) * keeperPairOverLoadFactor;
double peerDataStandard = Math.min(keeperContainer1.getRedisUsedMemoryStandard(), keeperContainer2.getRedisUsedMemoryStandard()) * keeperPairOverLoadFactor;
return longLongPair.getInputFlow() + keeperUsedInfoEntry.getValue().getInputFlow() < flowStandard &&
longLongPair.getPeerData() + keeperUsedInfoEntry.getValue().getPeerData() < peerDataStandard;
IPPairData longLongPair = analyzerContext.getIPPairData(keeperContainer1.getKeeperIp(), keeperContainer2.getKeeperIp());
if (longLongPair == null) return true;
KeeperContainerOverloadStandardModel minStandardModel = new KeeperContainerOverloadStandardModel()
.setFlowOverload((long) (Math.min(keeperContainer1.getInputFlowStandard(), keeperContainer2.getInputFlowStandard()) * keeperPairOverLoadFactor))
.setPeerDataOverload((long) (Math.min(keeperContainer1.getRedisUsedMemoryStandard(), keeperContainer2.getRedisUsedMemoryStandard()) * keeperPairOverLoadFactor));

long overloadInputFlow = longLongPair.getInputFlow() + keeperUsedInfoEntry.getValue().getInputFlow() - minStandardModel.getFlowOverload();
long overloadPeerData = longLongPair.getPeerData() + keeperUsedInfoEntry.getValue().getPeerData() - minStandardModel.getPeerDataOverload();
return overloadPeerData < 0 && overloadInputFlow < 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.ctrip.xpipe.redis.console.keeper.impl;

public class DefaultKeeperContainerAvailablePool {

Check warning on line 3 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerAvailablePool.java#L3

Added line #L3 was not covered by tests
}
Loading
Loading