Skip to content

Commit

Permalink
fix checker not report keeper result
Browse files Browse the repository at this point in the history
  • Loading branch information
songyuyuyu committed Oct 27, 2023
1 parent da09bf3 commit 3b76c4a
Show file tree
Hide file tree
Showing 32 changed files with 396 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void doStart() {

@Override
public void doStop() {
logger.debug("[stopped][{}][{}], listener:{}, future:{}", getClass().getSimpleName(), instance.getCheckInfo(), listeners, future);
if(future != null) {
future.cancel(true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info;

import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction;
import com.ctrip.xpipe.redis.checker.healthcheck.KeeperSupport;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo;
import com.ctrip.xpipe.redis.checker.healthcheck.*;
import com.ctrip.xpipe.redis.checker.healthcheck.impl.DefaultRedisInstanceInfo;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
import org.slf4j.Logger;
Expand Down Expand Up @@ -34,7 +33,9 @@ public void onAction(RedisInfoActionContext context) {

@Override
public void stopWatch(HealthCheckAction action) {

DefaultRedisInstanceInfo info = (DefaultRedisInstanceInfo) action.getActionInstance().getCheckInfo();
logger.debug("[stopWatch] DcClusterShard: {}", new DcClusterShard(info.getDcId(), info.getClusterId(), info.getShardId()));
dcClusterShardUsedMemory.remove(new DcClusterShard(info.getDcId(), info.getClusterId(), info.getShardId()));
}

public ConcurrentMap<DcClusterShard, Long> getDcClusterShardUsedMemory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public void onAction(KeeperInfoStatsActionContext context) {

@Override
public void stopWatch(HealthCheckAction action) {

KeeperInstanceInfo instanceInfo = (KeeperInstanceInfo) action.getActionInstance().getCheckInfo();
logger.debug("stopWatch: {}", new DcClusterShard(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId()));
hostPort2InputFlow.get(instanceInfo.getHostPort().getHost())
.remove(new DcClusterShard(instanceInfo.getDcId(), instanceInfo.getClusterId(), instanceInfo.getShardId()));
}

public ConcurrentMap<String, Map<DcClusterShard, Long>> getHostPort2InputFlow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ protected List<KeeperHealthCheckInstance> getAllInstances() {
@Override
protected void registerInstance(KeeperHealthCheckInstance instance) {
ClusterType clusterType = instance.getCheckInfo().getClusterType();
//TODO add hetero type
if (clusterType.equals(ClusterType.ONE_WAY)

if ((clusterType.equals(ClusterType.ONE_WAY) || clusterType.equals(ClusterType.HETERO))
&& AbstractKeeperLeaderAwareHealthCheckActionFactory.this instanceof KeeperSupport){
registerTo(instance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ public void compare(DcMeta future) {
}

if (currentDcId.equalsIgnoreCase(dcId)) {
DcMeta futureDcAllMeta = getCurrentDcMeta(dcId);
KeeperContainerMetaComparator keeperContainerMetaComparator
= new KeeperContainerMetaComparator(current, future, currentDcAllMeta, getCurrentDcMeta(dcId));
= new KeeperContainerMetaComparator(current, future, currentDcAllMeta, futureDcAllMeta);
keeperContainerMetaComparator.compare();
keeperContainerMetaComparator.accept(new KeeperContainerMetaComparatorVisitor());
currentDcAllMeta = futureDcAllMeta;
}

comparator.accept(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -43,10 +44,11 @@ public class KeeperContainerInfoReporter implements GroupCheckerLeaderAware {


public KeeperContainerInfoReporter(RedisUsedMemoryCollector redisUsedMemoryCollector, CheckerConsoleService
checkerConsoleService, KeeperFlowCollector keeperFlowCollector) {
checkerConsoleService, KeeperFlowCollector keeperFlowCollector, CheckerConfig config) {
this.redisUsedMemoryCollector = redisUsedMemoryCollector;
this.keeperFlowCollector = keeperFlowCollector;
this.checkerConsoleService = checkerConsoleService;
this.config = config;
}

@PostConstruct
Expand Down Expand Up @@ -87,7 +89,8 @@ public void notLeader() {
}
}

private void reportKeeperContainerInfo() {
@VisibleForTesting
void reportKeeperContainerInfo() {
try {
logger.debug("[reportKeeperContainerInfo] start");
Map<String, Map<DcClusterShard, Long>> hostPort2InputFlow = keeperFlowCollector.getHostPort2InputFlow();
Expand All @@ -103,14 +106,19 @@ private void reportKeeperContainerInfo() {
for (Map.Entry<DcClusterShard, Long> entry : inputFlowMap.entrySet()) {
totalInputFlow += entry.getValue();
Long redisUsedMemory = dcClusterShardUsedMemory.get(entry.getKey());
if (redisUsedMemory == null) {
logger.warn("[reportKeeperContainerInfo] redisUsedMemory is null, dcClusterShard: {}", entry.getKey());
redisUsedMemory = 0L;
}
totalRedisUsedMemory += redisUsedMemory;

detailInfo.put(entry.getKey(), new Pair<>(entry.getValue(), redisUsedMemory));
}

model.setDetailInfo(detailInfo).setTotalInputFlow(totalInputFlow).setTotalRedisUsedMemory(totalRedisUsedMemory);
result.add(model);
});

logger.debug("[reportKeeperContainerInfo] result: {}", result);
checkerConsoleService.reportKeeperContainerInfo(config.getConsoleAddress(), result, config.getClustersPartIndex());
} catch (Throwable th) {
logger.info("[reportKeeperContainerInfo] fail", th);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.ctrip.xpipe.redis.checker.model;

import java.io.Serializable;
import java.util.Objects;

public class DcClusterShard {
public class DcClusterShard implements Serializable {

protected static final String SPLITTER = ":";

protected String dcId;

Expand All @@ -20,6 +23,15 @@ public DcClusterShard(String dcId, String clusterId, String shardId) {
this.shardId = shardId;
}

public DcClusterShard(String info) {
String[] split = info.split(SPLITTER);
if (split.length >= 3) {
this.dcId = split[0];
this.clusterId = split[1];
this.shardId = split[2];
}
}

public DcClusterShard setDcId(String dcId) {
this.dcId = dcId;
return this;
Expand Down Expand Up @@ -59,16 +71,11 @@ public boolean equals(Object o) {

@Override
public int hashCode() {

return Objects.hash(dcId, clusterId, shardId);
}

@Override
public String toString() {
return "DcClusterShard{" +
"dcId='" + dcId + '\'' +
", clusterId='" + clusterId + '\'' +
", shardId='" + shardId + '\'' +
'}';
return getDcId() + SPLITTER + getClusterId() + SPLITTER + getShardId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Objects;

public final class DcClusterShardPeer extends DcClusterShard implements Serializable {
private static final String SPLITTER = ":";

private String peerDcId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,13 @@ public void report(String console, HealthCheckResult result) {

@Override
public void reportKeeperContainerInfo(String console, List<KeeperContainerUsedInfoModel> keeperContainerUsedInfoModels, int index) {
UriComponents comp = UriComponentsBuilder.fromHttpUrl(console + ConsoleCheckerPath.PATH_PUT_KEEPER_CONTAINER_INFO_RESULT)
.buildAndExpand(index);
restTemplate.put(comp.toString(), keeperContainerUsedInfoModels);
try {
restTemplate.postForEntity(console + ConsoleCheckerPath.PATH_POST_KEEPER_CONTAINER_INFO_RESULT,
keeperContainerUsedInfoModels, RetMessage.class, index);

} catch (Throwable th) {
logger.error("report keeper used info fail : {}", index, th);
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.ctrip.xpipe.redis.checker.impl;

import com.ctrip.xpipe.redis.checker.CheckerConsoleService;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.*;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* @author yu
* <p>
* 2023/10/18
*/
@RunWith(org.mockito.junit.MockitoJUnitRunner.class)
public class KeeperContainerInfoReporterTest {

@InjectMocks
KeeperContainerInfoReporter keeperContainerInfoReporter;

@Mock
RedisUsedMemoryCollector redisUsedMemoryCollector;

@Mock
KeeperFlowCollector keeperFlowCollector;

@Mock
CheckerConsoleService checkerConsoleService;

@Mock
CheckerConfig config;

@Captor
ArgumentCaptor<List<KeeperContainerUsedInfoModel>> resultCaptor;

@Before
public void befor() {
DcClusterShard dcClusterShard1 = new DcClusterShard("jq", "cluster1", "shard1");
DcClusterShard dcClusterShard2 = new DcClusterShard("jq", "cluster1", "shard2");
DcClusterShard dcClusterShard3 = new DcClusterShard("jq", "cluster2", "shard1");
DcClusterShard dcClusterShard4 = new DcClusterShard("jq", "cluster2", "shard2");
DcClusterShard dcClusterShard5 = new DcClusterShard("jq", "cluster3", "shard1");
DcClusterShard dcClusterShard6 = new DcClusterShard("jq", "cluster3", "shard2");
DcClusterShard dcClusterShard7 = new DcClusterShard("jq", "cluster3", "shard3");

ConcurrentMap<String, Map<DcClusterShard, Long>> keeperFlowMap = Maps.newConcurrentMap();
Map<DcClusterShard, Long> map1 = new HashMap<>();
map1.put(dcClusterShard1, 2L);
map1.put(dcClusterShard4, 2L);
map1.put(dcClusterShard5, 2L);
keeperFlowMap.put("127.0.0.1", map1);

Map<DcClusterShard, Long> map2 = new HashMap<>();
map2.put(dcClusterShard2, 2L);
map2.put(dcClusterShard6, 2L);
keeperFlowMap.put("127.0.0.2", map2);

Map<DcClusterShard, Long> map3 = new HashMap<>();
map3.put(dcClusterShard3, 2L);
map3.put(dcClusterShard7, 2L);
keeperFlowMap.put("127.0.0.3", map3);

Mockito.when(keeperFlowCollector.getHostPort2InputFlow()).thenReturn(keeperFlowMap);

ConcurrentMap<DcClusterShard, Long> redisUsedMemoryMap = Maps.newConcurrentMap();
redisUsedMemoryMap.put(dcClusterShard1, 1L);
redisUsedMemoryMap.put(dcClusterShard2, 2L);
redisUsedMemoryMap.put(dcClusterShard3, 3L);
redisUsedMemoryMap.put(dcClusterShard4, 4L);
redisUsedMemoryMap.put(dcClusterShard6, 6L);
redisUsedMemoryMap.put(dcClusterShard7, 7L);

Mockito.when(redisUsedMemoryCollector.getDcClusterShardUsedMemory()).thenReturn(redisUsedMemoryMap);
Mockito.when(config.getConsoleAddress()).thenReturn("127.0.0.1");
Mockito.when(config.getClustersPartIndex()).thenReturn(0);

}

@Test
public void testReportKeeperContainerInfo() {
keeperContainerInfoReporter.reportKeeperContainerInfo();
Mockito.verify(checkerConsoleService, Mockito.times(1))
.reportKeeperContainerInfo(Mockito.anyString(), resultCaptor.capture(), Mockito.anyInt());

Assert.assertEquals(3, resultCaptor.getValue().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public interface ConsoleConfig extends CoreConfig, CheckerConfig, AlertConfig {

long getMigrationResultReportIntervalMill();

boolean isAutoMigrateOverloadKeeperContainerOpen();
boolean isAutoMigrateOverloadKeeperContainerOpen();

long getAutoMigrateOverloadKeeperContainerIntervalMilli();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.console.controller.api;


import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
import com.ctrip.xpipe.redis.console.keeper.KeeperContainerUsedInfoAnalyzer;
import com.ctrip.xpipe.redis.console.model.MigrationKeeperContainerDetailModel;
Expand All @@ -22,4 +23,9 @@ public class KeeperContainerOverloadController extends AbstractConsoleController
public List<MigrationKeeperContainerDetailModel> getAllReadyToMigrateKeeperContainers() {
return analyzer.getAllDcReadyToMigrationKeeperContainers();
}

@RequestMapping(value = "/keepercontainer/overload/info/current", method = RequestMethod.GET)
public List<KeeperContainerUsedInfoModel> getCurrentReadyToMigrateKeeperContainers() {
return analyzer.getAllKeeperContainerUsedInfoModels();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void reportHealthCheckResult(HttpServletRequest request, @RequestBody Hea
if (null != checkResult.getHeteroShardsDelay()) delayService.updateHeteroShardsDelays(checkResult.getHeteroShardsDelay());
}

@PutMapping(ConsoleCheckerPath.PATH_PUT_KEEPER_CONTAINER_INFO_RESULT)
@PostMapping(ConsoleCheckerPath.PATH_POST_KEEPER_CONTAINER_INFO_RESULT)
public void reportHealthCheckResult(HttpServletRequest request, @PathVariable int index, @RequestBody List<KeeperContainerUsedInfoModel> keeperContainerUsedInfoModels) {
logger.debug("[reportHealthCheckResult][{}] {}", request.getRemoteAddr(), keeperContainerUsedInfoModels);
if (keeperContainerUsedInfoModels == null || keeperContainerUsedInfoModels.isEmpty()) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public RetMessage beginToMigrateOverloadKeeperContainers(@RequestBody List<Migra
@RequestMapping(value = "/keepercontainer/overload/migration/stop", method = RequestMethod.POST)
public RetMessage stopToMigrateOverloadKeeperContainers() {
logger.info("stop to migrate over load keeper containers");
analyzer.getAllDcReadyToMigrationKeeperContainers();
keeperContainerMigrationService.stopMigrateKeeperContainers();
return RetMessage.createSuccessMessage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void migrateKeepers(@RequestBody MigrationKeeperModel model) {
for (ShardModel shardModel : allShardModel) {
if (!shardModelService.migrateShardKeepers(model.getSrcKeeperContainer().getDcName(),
clusterTbl.getClusterName(), shardModel, model.getSrcKeeperContainer().getAddr().getHost(),
model.getTargetKeeperContainer() == null ? null : model.getTargetKeeperContainer().getAddr().getHost())) {
(model.getTargetKeeperContainer() == null || model.getTargetKeeperContainer().getAddr() == null)
? null : model.getTargetKeeperContainer().getAddr().getHost())) {
continue;
}
if (model.getMaxMigrationKeeperNum() != 0 && (++count) >= model.getMaxMigrationKeeperNum()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public interface KeeperContainerUsedInfoAnalyzer {
List<MigrationKeeperContainerDetailModel> getAllReadyToMigrationKeeperContainers();

List<MigrationKeeperContainerDetailModel> getAllDcReadyToMigrationKeeperContainers();

List<KeeperContainerUsedInfoModel> getAllKeeperContainerUsedInfoModels();
}
Loading

0 comments on commit 3b76c4a

Please sign in to comment.