diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerMigrationAnalyzer.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerMigrationAnalyzer.java index ee299f67d..f26088beb 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerMigrationAnalyzer.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperContainerMigrationAnalyzer.java @@ -52,7 +52,7 @@ public List getMigrationPlans(Map modelsWithoutResource = new ArrayList<>(); models.forEach(a -> modelsWithoutResource.add(KeeperContainerUsedInfoModel.cloneKeeperContainerUsedInfoModel(a))); analyzerContext = new DefaultKeeperContainerUsedInfoAnalyzerContext(filterChain); - if(!analyzerContext.initKeeperPairData(modelsWithoutResource)) return null; + analyzerContext.initKeeperPairData(modelsWithoutResource); analyzerContext.initAvailablePool(modelsWithoutResource); for (KeeperContainerUsedInfoModel model : modelsWithoutResource) { generateDataOverLoadMigrationPlans(model, modelsMap); @@ -68,7 +68,6 @@ private void generateDataOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod if (cause == null) return; List> descShards = getDescShards(model.getDetailInfo(), (Boolean) cause[1]); Iterator> iterator = descShards.iterator(); - List temp = new ArrayList<>(); while (filterChain.isDataOverLoad(model) && iterator.hasNext()) { Map.Entry dcClusterShard = iterator.next(); if (!dcClusterShard.getKey().isActive()) continue; @@ -77,20 +76,15 @@ private void generateDataOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod analyzerContext.addMigrationPlan(model, backUpKeeper, true, false, (String) cause[0], dcClusterShard, null); continue; } - KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(model, backUpKeeper, (Boolean) cause[1]); + KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(model, dcClusterShard, (Boolean) cause[1]); if (bestKeeperContainer == null) { break; } - if (!filterChain.isKeeperPairOverload(dcClusterShard, backUpKeeper, bestKeeperContainer, analyzerContext)) { - if (filterChain.canMigrate(dcClusterShard, model, bestKeeperContainer, analyzerContext)) { - analyzerContext.addMigrationPlan(model, bestKeeperContainer, false, false, (String) cause[0], dcClusterShard, backUpKeeper); - } - analyzerContext.recycleKeeperContainer(bestKeeperContainer, (Boolean) cause[1]); - } else { - temp.add(bestKeeperContainer); + if (filterChain.canMigrate(dcClusterShard, model, bestKeeperContainer, analyzerContext)) { + analyzerContext.addMigrationPlan(model, bestKeeperContainer, false, false, (String) cause[0], dcClusterShard, backUpKeeper); } + analyzerContext.recycleKeeperContainer(bestKeeperContainer, (Boolean) cause[1]); } - temp.forEach(keeperContainer -> analyzerContext.recycleKeeperContainer(keeperContainer, (Boolean) cause[1])); if (filterChain.isDataOverLoad(model)) { logger.warn("[analyzeKeeperContainerUsedInfo] no available space for overload keeperContainer to migrate {}", model); CatEventMonitor.DEFAULT.logEvent(KEEPER_RESOURCE_LACK, "Dc:" + currentDc + " Org:" + model.getOrg() + " Az:" + model.getAz()); @@ -103,24 +97,20 @@ private void generatePairOverLoadMigrationPlans(KeeperContainerUsedInfoModel mod if (cause == null) return; List> descShards = getDescShards(getAllDetailInfo(modelA, modelB), (Boolean) cause[1]); Iterator> iterator = descShards.iterator(); - List temp = new ArrayList<>(); while (filterChain.isKeeperContainerPairOverload(modelA, modelB, analyzerContext.getIPPairData(modelA.getKeeperIp(), modelB.getKeeperIp())) && iterator.hasNext()) { Map.Entry dcClusterShard = iterator.next(); if (dcClusterShard.getKey().isActive()) continue; KeeperContainerUsedInfoModel activeKeeperContainer = dcClusterShard.getValue().getKeeperIP().equals(modelA.getKeeperIp()) ? modelB : modelA; KeeperContainerUsedInfoModel backUpKeeperContainer = dcClusterShard.getValue().getKeeperIP().equals(modelA.getKeeperIp()) ? modelA : modelB; - KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(backUpKeeperContainer, activeKeeperContainer, (Boolean) cause[1]); + KeeperContainerUsedInfoModel bestKeeperContainer = analyzerContext.getBestKeeperContainer(backUpKeeperContainer, dcClusterShard, (Boolean) cause[1]); if (bestKeeperContainer == null) { break; } if (!filterChain.isKeeperPairOverload(dcClusterShard, backUpKeeperContainer, bestKeeperContainer, analyzerContext)) { analyzerContext.addMigrationPlan(backUpKeeperContainer, bestKeeperContainer, false, true, (String) cause[0], dcClusterShard, activeKeeperContainer); analyzerContext.recycleKeeperContainer(bestKeeperContainer, (Boolean) cause[1]); - } else { - temp.add(backUpKeeperContainer); } } - temp.forEach(keeperContainer -> analyzerContext.recycleKeeperContainer(keeperContainer, (Boolean) cause[1])); if (filterChain.isKeeperContainerPairOverload(modelA, modelB, analyzerContext.getIPPairData(modelA.getKeeperIp(), modelB.getKeeperIp()))) { logger.warn("[analyzeKeeperContainerUsedInfo] no available space for overload keeperContainer pair to migrate {} {}", modelA, modelB); CatEventMonitor.DEFAULT.logEvent(KEEPER_PAIR_RESOURCE_LACK, "Dc:" + currentDc + " OrgA:" + modelA.getOrg() + " AzA:" + modelA.getAz() + " OrgB:" + modelB.getOrg() + " AzB:" + modelB.getAz()); @@ -137,8 +127,12 @@ private List> getDescShards(Map< } private boolean canSwitchMaster(KeeperContainerUsedInfoModel src, KeeperContainerUsedInfoModel backUp, Map.Entry dcClusterShard) { - return (src.getOrg() == null || src.getOrg().equals(backUp.getOrg())) && (src.getAz() == null || src.getAz().equals(backUp.getAz())) && - filterChain.isKeeperContainerUseful(backUp) && filterChain.canMigrate(dcClusterShard, src, backUp, analyzerContext); + return backUp != null + && !analyzerContext.isProblemKeeperContainer(backUp.getKeeperIp()) + && (src.getOrg() == null || src.getOrg().equals(backUp.getOrg())) + && (src.getAz() == null || src.getAz().equals(backUp.getAz())) + && filterChain.isKeeperContainerUseful(backUp) + && filterChain.canMigrate(dcClusterShard, src, backUp, analyzerContext); } private Map getAllDetailInfo(KeeperContainerUsedInfoModel modelA, KeeperContainerUsedInfoModel modelB) { diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java index 2ea9230f4..02edda6b7 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/DefaultKeeperContainerUsedInfoAnalyzerContext.java @@ -27,6 +27,8 @@ public class DefaultKeeperContainerUsedInfoAnalyzerContext implements KeeperCont private Map> migrationPlansMap = new HashMap<>(); + private Set problemKeeperContainer = new HashSet<>(); + private KeeperContainerFilterChain filterChain; private static final String KEEPER_NO_BACKUP = "keeper_no_backup"; @@ -68,16 +70,22 @@ public void addResourceLackPlan(KeeperContainerUsedInfoModel src, String cause) migrationPlansMap.get(src.getKeeperIp()).add(new MigrationKeeperContainerDetailModel(src, null, false, false, cause, new ArrayList<>())); } + @Override + public boolean isProblemKeeperContainer(String keeperContainerIp) { + return problemKeeperContainer.contains(keeperContainerIp); + } + @Override public void initAvailablePool(List usedInfoMap) { minInputFlowKeeperContainers = new PriorityQueue<>(usedInfoMap.size(), (keeper1, keeper2) -> (int)((keeper2.getInputFlowStandard() - keeper2.getActiveInputFlow()) - (keeper1.getInputFlowStandard() - keeper1.getActiveInputFlow()))); minPeerDataKeeperContainers = new PriorityQueue<>(usedInfoMap.size(), (keeper1, keeper2) -> (int)((keeper2.getRedisUsedMemoryStandard() - keeper2.getTotalRedisUsedMemory()) - (keeper1.getRedisUsedMemoryStandard() - keeper1.getTotalRedisUsedMemory()))); - usedInfoMap.forEach(keeperContainerInfoModel -> { - if (filterChain.isKeeperContainerUseful(keeperContainerInfoModel)) { - minPeerDataKeeperContainers.add(keeperContainerInfoModel); - minInputFlowKeeperContainers.add(keeperContainerInfoModel); + usedInfoMap.forEach(model -> { + if (!problemKeeperContainer.contains(model.getKeeperIp()) + && filterChain.isKeeperContainerUseful(model)) { + minPeerDataKeeperContainers.add(model); + minInputFlowKeeperContainers.add(model); } }); } @@ -94,14 +102,16 @@ public void recycleKeeperContainer(KeeperContainerUsedInfoModel keeperContainer, } @Override - public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, KeeperContainerUsedInfoModel backUpKeeper, boolean isPeerDataOverload) { + public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, Map.Entry dcClusterShard, boolean isPeerDataOverload) { String org = usedInfoModel.getOrg(); String az = usedInfoModel.getAz(); PriorityQueue queue = isPeerDataOverload ? minPeerDataKeeperContainers : minInputFlowKeeperContainers; Queue temp = new LinkedList<>(); while (!queue.isEmpty()) { KeeperContainerUsedInfoModel keeperContainer = queue.poll(); - if ((org == null || org.equals(keeperContainer.getOrg())) && (az == null || az.equals(keeperContainer.getAz())) && !keeperContainer.getKeeperIp().equals(backUpKeeper.getKeeperIp())) { + if ((org == null || org.equals(keeperContainer.getOrg())) + && (az == null || az.equals(keeperContainer.getAz())) + && filterChain.canMigrate(dcClusterShard, usedInfoModel, keeperContainer, this) ) { return keeperContainer; } temp.add(keeperContainer); @@ -113,7 +123,7 @@ public KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedIn } @Override - public boolean initKeeperPairData(List usedInfoMap) { + public void initKeeperPairData(List usedInfoMap) { ipPairMap.clear(); allDetailInfo.clear(); for (KeeperContainerUsedInfoModel infoModel : usedInfoMap) { @@ -125,10 +135,12 @@ public boolean initKeeperPairData(List usedInfoMap if (!entry.getKey().isActive()) continue; KeeperUsedInfo activeKeeperUsedInfo = entry.getValue(); String backUpKeeperIp = getBackUpKeeperIp(entry.getKey()); - if (backUpKeeperIp == null) return false; + if (backUpKeeperIp == null) { + problemKeeperContainer.add(entry.getValue().getKeeperIP()); + continue; + } addIpPair(activeKeeperUsedInfo.getKeeperIP(), backUpKeeperIp, entry); } - return true; } @Override @@ -136,7 +148,7 @@ public String getBackUpKeeperIp(DcClusterShard activeKeeper) { KeeperUsedInfo backUpKeeperUsedInfo = allDetailInfo.get(new DcClusterShardKeeper(activeKeeper, false)); if (backUpKeeperUsedInfo == null) { CatEventMonitor.DEFAULT.logEvent(KEEPER_NO_BACKUP, activeKeeper.toString()); - throw new RuntimeException("active keeper " + activeKeeper + " has no backup keeper"); + return null; } return backUpKeeperUsedInfo.getKeeperIP(); } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java index 50f89c3bc..355f228f5 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/keeper/util/KeeperContainerUsedInfoAnalyzerContext.java @@ -12,13 +12,13 @@ public interface KeeperContainerUsedInfoAnalyzerContext { - boolean initKeeperPairData(List usedInfoMap); + void initKeeperPairData(List usedInfoMap); void initAvailablePool(List usedInfoMap); void recycleKeeperContainer(KeeperContainerUsedInfoModel keeperContainer, boolean isPeerDataOverload); - KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, KeeperContainerUsedInfoModel backUpKeeper, boolean isPeerDataOverload); + KeeperContainerUsedInfoModel getBestKeeperContainer(KeeperContainerUsedInfoModel usedInfoModel, Map.Entry dcClusterShard, boolean isPeerDataOverload); String getBackUpKeeperIp(DcClusterShard activeKeeper); @@ -34,4 +34,6 @@ public interface KeeperContainerUsedInfoAnalyzerContext { void addResourceLackPlan(KeeperContainerUsedInfoModel src, String cause); + boolean isProblemKeeperContainer(String keeperContainerIp); + } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperUsedInfoAnalyzerTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperUsedInfoAnalyzerTest.java index a0c235914..60d233611 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperUsedInfoAnalyzerTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/keeper/impl/DefaultKeeperUsedInfoAnalyzerTest.java @@ -535,4 +535,37 @@ public void testKeeperContainerAzAndOrg() { } + @Test + public void testKeeperContainersNoBackupKeeper() { + filterChain.setConfig(config); + Mockito.when(config.getKeeperPairOverLoadFactor()).thenReturn(5.0); + Map models = new HashMap<>(); + createKeeperContainer(models, IP1, 42, 42) + .createKeeper(Cluster1, Shard1, true, 14, 14) + .createKeeper(Cluster1, Shard2, true, 14, 14) + .createKeeper(Cluster2, Shard1, true, 14, 14); + + createKeeperContainer(models, IP2, 1, 1) + .createKeeper(Cluster1, Shard1, false, 14, 14) + .createKeeper(Cluster1, Shard2, false, 14, 14) + .createKeeper(Cluster2, Shard1, false, 14, 14) + .createKeeper(Cluster2, Shard2, true, 1, 1); + + createKeeperContainer(models, IP3, 1, 1) + .createKeeper(Cluster5, Shard1, true, 1, 1); + + createKeeperContainer(models, IP4, 1, 1) + .createKeeper(Cluster5, Shard2, true, 1, 1) + .createKeeper(Cluster5, Shard3, false, 1, 1); + + createKeeperContainer(models, IP5, 1, 1) + .createKeeper(Cluster5, Shard2, false, 1, 1) + .createKeeper(Cluster5, Shard3, true, 1, 1); + + List allDcReadyToMigrationKeeperContainers = migrationAnalyzer.getMigrationPlans(models); + Assert.assertEquals(2, allDcReadyToMigrationKeeperContainers.size()); + Assert.assertNotEquals(IP3, allDcReadyToMigrationKeeperContainers.get(0).getTargetKeeperContainer().getKeeperIp()); + Assert.assertNotEquals(IP3, allDcReadyToMigrationKeeperContainers.get(1).getTargetKeeperContainer().getKeeperIp()); + } + } \ No newline at end of file