Skip to content

Commit

Permalink
优化获取最佳迁移目标KeeperContaienr方法,处理部分keeper挂掉或者info超时情况
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Mar 25, 2024
1 parent 947f1ec commit e61b76c
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public List<MigrationKeeperContainerDetailModel> getMigrationPlans(Map<String, K
List<KeeperContainerUsedInfoModel> modelsWithoutResource = new ArrayList<>();
models.forEach(a -> modelsWithoutResource.add(KeeperContainerUsedInfoModel.cloneKeeperContainerUsedInfoModel(a)));
analyzerContext = new DefaultKeeperContainerUsedInfoAnalyzerContext(filterChain);
if(!analyzerContext.initKeeperPairData(modelsWithoutResource)) return null;
analyzerContext.initKeeperPairData(modelsWithoutResource);
analyzerContext.initAvailablePool(modelsWithoutResource);
for (KeeperContainerUsedInfoModel model : modelsWithoutResource) {
generateDataOverLoadMigrationPlans(model, modelsMap);
Expand All @@ -68,7 +68,6 @@ private void generateDataOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod
if (cause == null) return;
List<Map.Entry<DcClusterShardKeeper, KeeperUsedInfo>> descShards = getDescShards(model.getDetailInfo(), (Boolean) cause[1]);
Iterator<Map.Entry<DcClusterShardKeeper, KeeperUsedInfo>> iterator = descShards.iterator();
List<KeeperContainerUsedInfoModel> temp = new ArrayList<>();
while (filterChain.isDataOverLoad(model) && iterator.hasNext()) {
Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard = iterator.next();
if (!dcClusterShard.getKey().isActive()) continue;
Expand All @@ -77,20 +76,15 @@ private void generateDataOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod
analyzerContext.addMigrationPlan(model, backUpKeeper, true, false, (String) cause[0], dcClusterShard, null);
continue;
}
KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(model, backUpKeeper, (Boolean) cause[1]);
KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(model, dcClusterShard, (Boolean) cause[1]);
if (bestKeeperContainer == null) {
break;
}
if (!filterChain.isKeeperPairOverload(dcClusterShard, backUpKeeper, bestKeeperContainer, analyzerContext)) {
if (filterChain.canMigrate(dcClusterShard, model, bestKeeperContainer, analyzerContext)) {
analyzerContext.addMigrationPlan(model, bestKeeperContainer, false, false, (String) cause[0], dcClusterShard, backUpKeeper);
}
analyzerContext.recycleKeeperContainer(bestKeeperContainer, (Boolean) cause[1]);
} else {
temp.add(bestKeeperContainer);
if (filterChain.canMigrate(dcClusterShard, model, bestKeeperContainer, analyzerContext)) {
analyzerContext.addMigrationPlan(model, bestKeeperContainer, false, false, (String) cause[0], dcClusterShard, backUpKeeper);
}
analyzerContext.recycleKeeperContainer(bestKeeperContainer, (Boolean) cause[1]);
}
temp.forEach(keeperContainer -> analyzerContext.recycleKeeperContainer(keeperContainer, (Boolean) cause[1]));
if (filterChain.isDataOverLoad(model)) {
logger.warn("[analyzeKeeperContainerUsedInfo] no available space for overload keeperContainer to migrate {}", model);
CatEventMonitor.DEFAULT.logEvent(KEEPER_RESOURCE_LACK, "Dc:" + currentDc + " Org:" + model.getOrg() + " Az:" + model.getAz());
Expand All @@ -103,24 +97,20 @@ private void generatePairOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod
if (cause == null) return;
List<Map.Entry<DcClusterShardKeeper, KeeperUsedInfo>> descShards = getDescShards(getAllDetailInfo(modelA, modelB), (Boolean) cause[1]);
Iterator<Map.Entry<DcClusterShardKeeper, KeeperUsedInfo>> iterator = descShards.iterator();
List<KeeperContainerUsedInfoModel> temp = new ArrayList<>();
while (filterChain.isKeeperContainerPairOverload(modelA, modelB, analyzerContext.getIPPairData(modelA.getKeeperIp(), modelB.getKeeperIp())) && iterator.hasNext()) {
Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard = iterator.next();
if (dcClusterShard.getKey().isActive()) continue;
KeeperContainerUsedInfoModel activeKeeperContainer = dcClusterShard.getValue().getKeeperIP().equals(modelA.getKeeperIp()) ? modelB : modelA;
KeeperContainerUsedInfoModel backUpKeeperContainer = dcClusterShard.getValue().getKeeperIP().equals(modelA.getKeeperIp()) ? modelA : modelB;
KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(backUpKeeperContainer, activeKeeperContainer, (Boolean) cause[1]);
KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(backUpKeeperContainer, dcClusterShard, (Boolean) cause[1]);
if (bestKeeperContainer == null) {
break;
}
if (!filterChain.isKeeperPairOverload(dcClusterShard, backUpKeeperContainer, bestKeeperContainer, analyzerContext)) {
analyzerContext.addMigrationPlan(backUpKeeperContainer, bestKeeperContainer, false, true, (String) cause[0], dcClusterShard, activeKeeperContainer);
analyzerContext.recycleKeeperContainer(bestKeeperContainer, (Boolean) cause[1]);
} else {
temp.add(backUpKeeperContainer);
}
}
temp.forEach(keeperContainer -> analyzerContext.recycleKeeperContainer(keeperContainer, (Boolean) cause[1]));
if (filterChain.isKeeperContainerPairOverload(modelA, modelB, analyzerContext.getIPPairData(modelA.getKeeperIp(), modelB.getKeeperIp()))) {
logger.warn("[analyzeKeeperContainerUsedInfo] no available space for overload keeperContainer pair to migrate {} {}", modelA, modelB);
CatEventMonitor.DEFAULT.logEvent(KEEPER_PAIR_RESOURCE_LACK, "Dc:" + currentDc + " OrgA:" + modelA.getOrg() + " AzA:" + modelA.getAz() + " OrgB:" + modelB.getOrg() + " AzB:" + modelB.getAz());
Expand All @@ -137,8 +127,12 @@ private List<Map.Entry<DcClusterShardKeeper, KeeperUsedInfo>> getDescShards(Map<
}

private boolean canSwitchMaster(KeeperContainerUsedInfoModel src, KeeperContainerUsedInfoModel backUp, Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard) {
return (src.getOrg() == null || src.getOrg().equals(backUp.getOrg())) && (src.getAz() == null || src.getAz().equals(backUp.getAz())) &&
filterChain.isKeeperContainerUseful(backUp) && filterChain.canMigrate(dcClusterShard, src, backUp, analyzerContext);
return backUp != null
&& !analyzerContext.isProblemKeeperContainer(backUp.getKeeperIp())
&& (src.getOrg() == null || src.getOrg().equals(backUp.getOrg()))
&& (src.getAz() == null || src.getAz().equals(backUp.getAz()))
&& filterChain.isKeeperContainerUseful(backUp)
&& filterChain.canMigrate(dcClusterShard, src, backUp, analyzerContext);
}

private Map<DcClusterShardKeeper, KeeperUsedInfo> getAllDetailInfo(KeeperContainerUsedInfoModel modelA, KeeperContainerUsedInfoModel modelB) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class DefaultKeeperContainerUsedInfoAnalyzerContext implements KeeperCont

private Map<String, List<MigrationKeeperContainerDetailModel>> migrationPlansMap = new HashMap<>();

private Set<String> problemKeeperContainer = new HashSet<>();

private KeeperContainerFilterChain filterChain;

private static final String KEEPER_NO_BACKUP = "keeper_no_backup";
Expand Down Expand Up @@ -68,16 +70,22 @@ public void addResourceLackPlan(KeeperContainerUsedInfoModel src, String cause)
migrationPlansMap.get(src.getKeeperIp()).add(new MigrationKeeperContainerDetailModel(src, null, false, false, cause, new ArrayList<>()));
}

@Override
public boolean isProblemKeeperContainer(String keeperContainerIp) {
return problemKeeperContainer.contains(keeperContainerIp);
}

@Override
public void initAvailablePool(List<KeeperContainerUsedInfoModel> usedInfoMap) {
minInputFlowKeeperContainers = new PriorityQueue<>(usedInfoMap.size(),
(keeper1, keeper2) -> (int)((keeper2.getInputFlowStandard() - keeper2.getActiveInputFlow()) - (keeper1.getInputFlowStandard() - keeper1.getActiveInputFlow())));
minPeerDataKeeperContainers = new PriorityQueue<>(usedInfoMap.size(),
(keeper1, keeper2) -> (int)((keeper2.getRedisUsedMemoryStandard() - keeper2.getTotalRedisUsedMemory()) - (keeper1.getRedisUsedMemoryStandard() - keeper1.getTotalRedisUsedMemory())));
usedInfoMap.forEach(keeperContainerInfoModel -> {
if (filterChain.isKeeperContainerUseful(keeperContainerInfoModel)) {
minPeerDataKeeperContainers.add(keeperContainerInfoModel);
minInputFlowKeeperContainers.add(keeperContainerInfoModel);
usedInfoMap.forEach(model -> {
if (!problemKeeperContainer.contains(model.getKeeperIp())
&& filterChain.isKeeperContainerUseful(model)) {
minPeerDataKeeperContainers.add(model);
minInputFlowKeeperContainers.add(model);
}
});
}
Expand All @@ -94,14 +102,16 @@ public void recycleKeeperContainer(KeeperContainerUsedInfoModel keeperContainer,
}

@Override
public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, KeeperContainerUsedInfoModel backUpKeeper, boolean isPeerDataOverload) {
public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard, boolean isPeerDataOverload) {
String org = usedInfoModel.getOrg();
String az = usedInfoModel.getAz();
PriorityQueue<KeeperContainerUsedInfoModel> queue = isPeerDataOverload ? minPeerDataKeeperContainers : minInputFlowKeeperContainers;
Queue<KeeperContainerUsedInfoModel> temp = new LinkedList<>();
while (!queue.isEmpty()) {
KeeperContainerUsedInfoModel keeperContainer = queue.poll();
if ((org == null || org.equals(keeperContainer.getOrg())) && (az == null || az.equals(keeperContainer.getAz())) && !keeperContainer.getKeeperIp().equals(backUpKeeper.getKeeperIp())) {
if ((org == null || org.equals(keeperContainer.getOrg()))
&& (az == null || az.equals(keeperContainer.getAz()))
&& filterChain.canMigrate(dcClusterShard, usedInfoModel, keeperContainer, this) ) {
return keeperContainer;
}
temp.add(keeperContainer);
Expand All @@ -113,7 +123,7 @@ public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedIn
}

@Override
public boolean initKeeperPairData(List<KeeperContainerUsedInfoModel> usedInfoMap) {
public void initKeeperPairData(List<KeeperContainerUsedInfoModel> usedInfoMap) {
ipPairMap.clear();
allDetailInfo.clear();
for (KeeperContainerUsedInfoModel infoModel : usedInfoMap) {
Expand All @@ -125,18 +135,20 @@ public boolean initKeeperPairData(List<KeeperContainerUsedInfoModel> usedInfoMap
if (!entry.getKey().isActive()) continue;
KeeperUsedInfo activeKeeperUsedInfo = entry.getValue();
String backUpKeeperIp = getBackUpKeeperIp(entry.getKey());
if (backUpKeeperIp == null) return false;
if (backUpKeeperIp == null) {
problemKeeperContainer.add(entry.getValue().getKeeperIP());
continue;
}
addIpPair(activeKeeperUsedInfo.getKeeperIP(), backUpKeeperIp, entry);
}
return true;
}

@Override
public String getBackUpKeeperIp(DcClusterShard activeKeeper) {
KeeperUsedInfo backUpKeeperUsedInfo = allDetailInfo.get(new DcClusterShardKeeper(activeKeeper, false));
if (backUpKeeperUsedInfo == null) {
CatEventMonitor.DEFAULT.logEvent(KEEPER_NO_BACKUP, activeKeeper.toString());
throw new RuntimeException("active keeper " + activeKeeper + " has no backup keeper");
return null;
}
return backUpKeeperUsedInfo.getKeeperIP();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@

public interface KeeperContainerUsedInfoAnalyzerContext {

boolean initKeeperPairData(List<KeeperContainerUsedInfoModel> usedInfoMap);
void initKeeperPairData(List<KeeperContainerUsedInfoModel> usedInfoMap);

void initAvailablePool(List<KeeperContainerUsedInfoModel> usedInfoMap);

void recycleKeeperContainer(KeeperContainerUsedInfoModel keeperContainer, boolean isPeerDataOverload);

KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, KeeperContainerUsedInfoModel backUpKeeper, boolean isPeerDataOverload);
KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, Map.Entry<DcClusterShardKeeper, KeeperUsedInfo> dcClusterShard, boolean isPeerDataOverload);

String getBackUpKeeperIp(DcClusterShard activeKeeper);

Expand All @@ -34,4 +34,6 @@ public interface KeeperContainerUsedInfoAnalyzerContext {

void addResourceLackPlan(KeeperContainerUsedInfoModel src, String cause);

boolean isProblemKeeperContainer(String keeperContainerIp);

}
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,37 @@ public void testKeeperContainerAzAndOrg() {

}

@Test
public void testKeeperContainersNoBackupKeeper() {
filterChain.setConfig(config);
Mockito.when(config.getKeeperPairOverLoadFactor()).thenReturn(5.0);
Map<String, KeeperContainerUsedInfoModel> models = new HashMap<>();
createKeeperContainer(models, IP1, 42, 42)
.createKeeper(Cluster1, Shard1, true, 14, 14)
.createKeeper(Cluster1, Shard2, true, 14, 14)
.createKeeper(Cluster2, Shard1, true, 14, 14);

createKeeperContainer(models, IP2, 1, 1)
.createKeeper(Cluster1, Shard1, false, 14, 14)
.createKeeper(Cluster1, Shard2, false, 14, 14)
.createKeeper(Cluster2, Shard1, false, 14, 14)
.createKeeper(Cluster2, Shard2, true, 1, 1);

createKeeperContainer(models, IP3, 1, 1)
.createKeeper(Cluster5, Shard1, true, 1, 1);

createKeeperContainer(models, IP4, 1, 1)
.createKeeper(Cluster5, Shard2, true, 1, 1)
.createKeeper(Cluster5, Shard3, false, 1, 1);

createKeeperContainer(models, IP5, 1, 1)
.createKeeper(Cluster5, Shard2, false, 1, 1)
.createKeeper(Cluster5, Shard3, true, 1, 1);

List<MigrationKeeperContainerDetailModel> allDcReadyToMigrationKeeperContainers = migrationAnalyzer.getMigrationPlans(models);
Assert.assertEquals(2, allDcReadyToMigrationKeeperContainers.size());
Assert.assertNotEquals(IP3, allDcReadyToMigrationKeeperContainers.get(0).getTargetKeeperContainer().getKeeperIp());
Assert.assertNotEquals(IP3, allDcReadyToMigrationKeeperContainers.get(1).getTargetKeeperContainer().getKeeperIp());
}

}

0 comments on commit e61b76c

Please sign in to comment.