Skip to content

Commit

Permalink
Feature/rebalance keeper (#765)
Browse files Browse the repository at this point in the history
* 修复空keeprContainer不会被检测,已经下线的keeperContainer仍存在于checker的问题

* 整理分析keeper问题代码

* 优化keeperContainer analyzer机构,新增修改keeperContainer DiskType内容

* keeperContainer的标准存到config_tbl中,并且提供修改api

---------

Co-authored-by: yifuzhou <[email protected]>
  • Loading branch information
passerbyzed and yifuzhou authored Jan 5, 2024
1 parent 112a10d commit c419d8a
Show file tree
Hide file tree
Showing 35 changed files with 931 additions and 692 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,26 @@
import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel.*;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperContainerMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperDiskInfo;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.client.RestClientException;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

public class KeeperContainerInfoReporter implements GroupCheckerLeaderAware {

Expand All @@ -44,26 +48,29 @@ public class KeeperContainerInfoReporter implements GroupCheckerLeaderAware {

private CheckerConfig config;

private MetaCache metaCache;

private static final String CURRENT_IDC = FoundationService.DEFAULT.getDataCenter();

private static final Logger logger = LoggerFactory.getLogger(HealthCheckReporter.class);


public KeeperContainerInfoReporter(RedisUsedMemoryCollector redisUsedMemoryCollector, CheckerConsoleService
checkerConsoleService, KeeperFlowCollector keeperFlowCollector, CheckerConfig config, KeeperContainerService keeperContainerService) {
checkerConsoleService, KeeperFlowCollector keeperFlowCollector, CheckerConfig config, KeeperContainerService keeperContainerService, MetaCache metaCache) {
this.redisUsedMemoryCollector = redisUsedMemoryCollector;
this.keeperFlowCollector = keeperFlowCollector;
this.checkerConsoleService = checkerConsoleService;
this.config = config;
this.keeperContainerService = keeperContainerService;
this.metaCache = metaCache;
}

@PostConstruct
public void init() {
logger.debug("[postConstruct] start");
this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("KeeperContainerInfoReporter"));
this.keeperContainerInfoReportTask = new DynamicDelayPeriodTask("KeeperContainerInfoReporter",
this::reportKeeperContainerInfo, () -> 2 * config.getKeeperCheckerIntervalMilli(), scheduled);
this::reportKeeperContainerInfo, () -> config.getKeeperCheckerIntervalMilli(), scheduled);
}

@PreDestroy
Expand Down Expand Up @@ -101,9 +108,27 @@ public void reportKeeperContainerInfo() {
try {
logger.debug("[reportKeeperContainerInfo] start");
Map<String, Map<DcClusterShardActive, Long>> hostPort2InputFlow = keeperFlowCollector.getHostPort2InputFlow();
for (DcMeta dcMeta : metaCache.getXpipeMeta().getDcs().values()) {
if (CURRENT_IDC.equalsIgnoreCase(dcMeta.getId())) {
List<String> ipList = dcMeta.getKeeperContainers().stream()
.map(KeeperContainerMeta::getIp)
.collect(Collectors.toList());
logger.info("[reportKeeperContainerInfo] current monitor keeperContainer:{}", ipList);
dcMeta.getKeeperContainers().forEach(keeperContainerMeta -> {
if(!hostPort2InputFlow.containsKey(keeperContainerMeta.getIp())) {
hostPort2InputFlow.put(keeperContainerMeta.getIp(), new ConcurrentHashMap<>());
}
});
for (String ip : hostPort2InputFlow.keySet()) {
if (!ipList.contains(ip)) {
logger.warn("[reportKeeperContainerInfo] keeperContainer:{} is not exit in console meta", ip);
hostPort2InputFlow.remove(ip);
}
}
}
}
Map<DcClusterShard, Long> dcClusterShardUsedMemory = redisUsedMemoryCollector.getDcClusterShardUsedMemory();
List<KeeperContainerUsedInfoModel> result = new ArrayList<>(hostPort2InputFlow.keySet().size());

hostPort2InputFlow.forEach((keeperIp, inputFlowMap) -> {
KeeperContainerUsedInfoModel model = new KeeperContainerUsedInfoModel();
model.setKeeperIp(keeperIp).setDcName(CURRENT_IDC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class KeeperContainerUsedInfoModel {

private String dcName;

private String org;

private long activeInputFlow;

private long totalInputFlow;
Expand Down Expand Up @@ -51,6 +53,7 @@ public KeeperContainerUsedInfoModel(String keeperIp, String dcName, long activeI
public KeeperContainerUsedInfoModel(KeeperContainerUsedInfoModel model, Map.Entry<DcClusterShardActive, KeeperUsedInfo> dcClusterShard) {
this.keeperIp = model.getKeeperIp();
this.dcName = model.getDcName();
this.org = model.getOrg();
this.activeInputFlow = model.getActiveInputFlow() + dcClusterShard.getValue().getInputFlow();
this.totalInputFlow = model.getTotalInputFlow() + dcClusterShard.getValue().getInputFlow();
this.inputFlowStandard = model.getInputFlowStandard();
Expand Down Expand Up @@ -86,6 +89,14 @@ public KeeperContainerUsedInfoModel setKeeperIp(String keeperIp) {
return this;
}

public String getOrg() {
return org;
}

public void setOrg(String org) {
this.org = org;
}

public long getActiveInputFlow() {
return activeInputFlow;
}
Expand Down Expand Up @@ -215,6 +226,7 @@ public String toString() {
return "KeeperContainerUsedInfoModel{" +
"keeperIp='" + keeperIp + '\'' +
", dcName='" + dcName + '\'' +
", org='" + org + '\'' +
", activeInputFlow=" + activeInputFlow +
", totalInputFlow=" + totalInputFlow +
", inputFlowStandard=" + inputFlowStandard +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ctrip.xpipe.redis.checker.impl;

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.redis.checker.CheckerConsoleService;
import com.ctrip.xpipe.redis.checker.KeeperContainerService;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
Expand All @@ -8,15 +9,16 @@
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardActive;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.core.entity.DiskSpaceUsageInfo;
import com.ctrip.xpipe.redis.core.entity.KeeperDiskInfo;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.*;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -50,11 +52,24 @@ public class KeeperUsedInfoReporterTest {
@Mock
KeeperContainerService keeperContainerService;

@Mock
private MetaCache metaCache;

@Captor
ArgumentCaptor<List<KeeperContainerUsedInfoModel>> resultCaptor;

@Before
public void befor() {
XpipeMeta xpipeMeta = new XpipeMeta();
DcMeta dcMeta = new DcMeta();
xpipeMeta.getDcs().put("dc",dcMeta);
dcMeta.setId(FoundationService.DEFAULT.getDataCenter());
List<KeeperContainerMeta> keeperContainerMetas = new ArrayList<>();
keeperContainerMetas.add(new KeeperContainerMeta().setIp("127.0.0.1"));
keeperContainerMetas.add(new KeeperContainerMeta().setIp("127.0.0.2"));
keeperContainerMetas.add(new KeeperContainerMeta().setIp("127.0.0.3"));
dcMeta.getKeeperContainers().addAll(keeperContainerMetas);
Mockito.when(metaCache.getXpipeMeta()).thenReturn(xpipeMeta);
DcClusterShardActive dcClusterShard1 = new DcClusterShardActive("jq", "cluster1", "shard1", false);
DcClusterShardActive dcClusterShard2 = new DcClusterShardActive("jq", "cluster1", "shard2", true);
DcClusterShardActive dcClusterShard3 = new DcClusterShardActive("jq", "cluster2", "shard1", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,6 @@ public interface ConsoleConfig extends CoreConfig, CheckerConfig, AlertConfig {

long getAutoMigrateOverloadKeeperContainerIntervalMilli();

Map<String,KeeperContainerOverloadStandardModel> getKeeperContainerOverloadStandards();

double getKeeperContainerOverloadFactor();

double getKeeperPairOverLoadFactor();

double getKeeperContainerDiskOverLoadFactor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ public class DefaultConsoleConfig extends AbstractCoreConfig implements ConsoleC
private static final String KEY_MIGRATION_RESULT_REPORT_TOKEN = "migration.result.report.token";
private static final String KEY_MIGRATION_RESULT_REPORT_OPEN = "migration.result.report.open";
private static final String KEY_MIGRATION_RESULT_REPORT_INTERVAL_MILLI = "migration.result.report.interval.milli";
private static final String KEY_CONSOLE_KEEPER_CONTAINER_OVERLOAD_STANDARD = "console.keeper.container.overload.standard";
private static final String KEY_CONSOLE_KEEPER_CONTAINER_OVERLOAD_STANDARD_FACTOR = "console.keeper.container.overload.standard.factor";
private static final String KEY_CONSOLE_KEEPER_PAIR_OVERLOAD_FACTOR = "console.keeper.container.pair.overload.standard.factor";
private static final String KEY_CONSOLE_KEEPER_CONTAINER_DISK_OVERLOAD_FACTOR = "console.keeper.container.disk.overload.factor";
private static final String KEY_CONSOLE_KEEPER_CONTAINER_IO_RATE = "console.keeper.container.io.rate";
Expand Down Expand Up @@ -591,7 +589,7 @@ public int getCheckerReportIntervalMilli() {

@Override
public int getCheckerCurrentDcAllMetaRefreshIntervalMilli() {
return getIntProperty(KEY_CHECKER_CURRENT_DC_ALL_META_REFRESH_INTERVAL, 600000);
return getIntProperty(KEY_CHECKER_CURRENT_DC_ALL_META_REFRESH_INTERVAL, 60 * 1000);
}

@Override
Expand Down Expand Up @@ -721,7 +719,7 @@ public Set<String> getMigrationUnsupportedClusters() {

@Override
public int getKeeperCheckerIntervalMilli() {
return getIntProperty(KEY_KEEPER_CHECKER_INTERVAL, 1800 * 1000);
return getIntProperty(KEY_KEEPER_CHECKER_INTERVAL, 60 * 1000);
}

@Override
Expand Down Expand Up @@ -782,17 +780,6 @@ public long getAutoMigrateOverloadKeeperContainerIntervalMilli() {
return getLongProperty(KEY_CONSOLE_AUTO_MIGRATE_OVERLOAD_KEEPER_CONTAINER_INTERVAL_MILLI, 60 * 60 * 1000L);
}

@Override
public Map<String,KeeperContainerOverloadStandardModel> getKeeperContainerOverloadStandards() {
String property = getProperty(KEY_CONSOLE_KEEPER_CONTAINER_OVERLOAD_STANDARD, "{}");
return JsonCodec.INSTANCE.decode(property, new GenericTypeReference<Map<String,KeeperContainerOverloadStandardModel>>() {});
}

@Override
public double getKeeperContainerOverloadFactor() {
return getFloatProperty(KEY_CONSOLE_KEEPER_CONTAINER_OVERLOAD_STANDARD_FACTOR, 0.8F);
}

@Override
public double getKeeperPairOverLoadFactor() {
return getFloatProperty(KEY_CONSOLE_KEEPER_PAIR_OVERLOAD_FACTOR, 0.25F);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,30 @@
package com.ctrip.xpipe.redis.console.controller.api;


import com.ctrip.xpipe.redis.checker.controller.result.RetMessage;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
import com.ctrip.xpipe.redis.console.keeper.KeeperContainerUsedInfoAnalyzer;
import com.ctrip.xpipe.redis.console.keeper.entity.KeeperContainerDiskType;
import com.ctrip.xpipe.redis.console.model.ConfigModel;
import com.ctrip.xpipe.redis.console.model.KeepercontainerTbl;
import com.ctrip.xpipe.redis.console.model.MigrationKeeperContainerDetailModel;
import com.ctrip.xpipe.redis.console.service.ConfigService;
import org.apache.http.HttpResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Map;

import static com.ctrip.xpipe.redis.console.service.ConfigService.KEY_KEEPER_CONTAINER_STANDARD;

@RestController
@RequestMapping(AbstractConsoleController.API_PREFIX)
Expand All @@ -21,26 +33,48 @@ public class KeeperContainerController extends AbstractConsoleController{
@Autowired
KeeperContainerUsedInfoAnalyzer analyzer;

@Autowired
ConfigService configService;

@RequestMapping(value = "/keepercontainer/overload/info/all", method = RequestMethod.GET)
public List<MigrationKeeperContainerDetailModel> getAllReadyToMigrateKeeperContainers() {
return analyzer.getCurrentDcReadyToMigrationKeeperContainers();
}

@RequestMapping(value = "/keepercontainer/info/all", method = RequestMethod.GET)
public List<KeeperContainerUsedInfoModel> getAllKeeperContainerUsedInfoModelsList() {
return new ArrayList<>(analyzer.getCurrentDcKeeperContainerUsedInfoModelsList().values());
return analyzer.getCurrentDcKeeperContainerUsedInfoModelsList();
}

@RequestMapping(value = "/keepercontainer/full/synchronization/time", method = RequestMethod.GET)
public Integer getMaxKeeperContainerFullSynchronizationTime() {
public List<Integer> getMaxKeeperContainerFullSynchronizationTime() {
return analyzer.getCurrentDcMaxKeeperContainerFullSynchronizationTime();
}

@RequestMapping(value = "/keepercontainer/overload/info/current", method = RequestMethod.GET)
public List<KeeperContainerUsedInfoModel> getCurrentReadyToMigrateKeeperContainers() {
List<KeeperContainerUsedInfoModel> result = new ArrayList<>();
analyzer.getKeeperContainerUsedInfoModelIndexMap().values().forEach(result::addAll);
return result;
@RequestMapping(value = "/keepercontainer/diskType", method = RequestMethod.POST)
public RetMessage setDiskType(@RequestBody ConfigModel configModel) {
try {
configModel.setKey(KEY_KEEPER_CONTAINER_STANDARD);
configService.setKeyKeeperContainerStandard(configModel);
return RetMessage.createSuccessMessage();
} catch (Exception e) {
return RetMessage.createFailMessage(e.getMessage());
}
}

@RequestMapping(value = "/keepercontainer/diskType", method = RequestMethod.GET)
public RetMessage getDiskType() {
try {
Map<String, String> map = new HashMap<>();
for (KeeperContainerDiskType value : KeeperContainerDiskType.values()) {
map.put(value.getPeerData(), configService.getConfig(KEY_KEEPER_CONTAINER_STANDARD, value.getPeerData()).getVal());
map.put(value.getInputFlow(), configService.getConfig(KEY_KEEPER_CONTAINER_STANDARD, value.getInputFlow()).getVal());
}
return RetMessage.createSuccessMessage(map.toString());
} catch (Exception e) {
return RetMessage.createFailMessage(e.getMessage());
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class KeeperContainerCreateInfo extends AbstractCreateInfo {

private String azName;

private String diskType;


public boolean isActive() {
return active;
}
Expand Down Expand Up @@ -99,6 +102,7 @@ public String toString() {
", orgName='" + orgName + '\'' +
", active=" + active +
", azName='" + azName + '\'' +
", diskType='" + diskType + '\'' +
'}';
}

Expand All @@ -110,4 +114,12 @@ public KeeperContainerCreateInfo setOrgName(String orgName) {
this.orgName = orgName;
return this;
}

public String getDiskType() {
return diskType;
}

public void setDiskType(String diskType) {
this.diskType = diskType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ public List<KeeperContainerUsedInfoModel> getLastedAllReadyMigrateKeeperContain

@RequestMapping(value = "/keepercontainer/max/fullSynchronizationTime", method = RequestMethod.GET)
public RetMessage getMaxKeeperContainerFullSynchronizationTime() {
return RetMessage.createSuccessMessage(String.valueOf(analyzer.getAllDcMaxKeeperContainerFullSynchronizationTime() + 1));
int max = analyzer.getAllDcMaxKeeperContainerFullSynchronizationTime().stream()
.mapToInt(Integer::intValue)
.max()
.orElse(0);
return RetMessage.createSuccessMessage(String.valueOf(max));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ctrip.xpipe.redis.console.keeper.Command;

import com.ctrip.xpipe.command.AbstractCommand;
import org.springframework.web.client.RestOperations;

public abstract class AbstractGetAllDcCommand<T> extends AbstractCommand<T> {
protected String domain;
protected RestOperations restTemplate;

protected AbstractGetAllDcCommand(String domain, RestOperations restTemplate) {
this.domain = domain;
this.restTemplate = restTemplate;
}

public void setDomain(String domain) {
this.domain = domain;
}

public abstract AbstractGetAllDcCommand<T> clone ();

}
Loading

0 comments on commit c419d8a

Please sign in to comment.