Skip to content

Commit

Permalink
add keeper related action when start
Browse files Browse the repository at this point in the history
  • Loading branch information
songyuyuyu committed Oct 17, 2023
1 parent 952b930 commit da09bf3
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.AbstractRedisConfigRuleAction;
import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -18,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* @author lishanglin
Expand All @@ -31,6 +35,12 @@ public class CheckerHealthController {
@Autowired
private DefaultDelayPingActionCollector defaultDelayPingActionCollector;

@Autowired
private RedisUsedMemoryCollector redisUsedMemoryCollector;

@Autowired
private KeeperFlowCollector keeperFlowCollector;

@Autowired
private HealthCheckInstanceManager instanceManager;

Expand Down Expand Up @@ -66,6 +76,26 @@ public String getClusterHealthCheckInstance(@PathVariable String clusterId) {
return Codec.DEFAULT.encode(model);
}

@RequestMapping(value = "/health/check/keeper/{ip}/{port}", method = RequestMethod.GET)
public String getHealthCheckKeeper(@PathVariable String ip, @PathVariable int port) {
KeeperHealthCheckInstance instance = instanceManager.findKeeperHealthCheckInstance(new HostPort(ip, port));
if(instance == null) {
return "Not found";
}
HealthCheckInstanceModel model = buildHealthCheckInfo(instance);
return Codec.DEFAULT.encode(model);
}

@RequestMapping(value = "/health/check/redis-for-assigned-action/{ip}/{port}", method = RequestMethod.GET)
public String getHealthCheckRedisInstanceForAssignedAction(@PathVariable String ip, @PathVariable int port) {
RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForAssignedAction(new HostPort(ip, port));
if(instance == null) {
return "Not found";
}
HealthCheckInstanceModel model = buildHealthCheckInfo(instance);
return Codec.DEFAULT.encode(model);
}

@RequestMapping(value = "/health/redis/info/{ip}/{port}", method = RequestMethod.GET)
public ActionContextRetMessage<Map<String, String>> getRedisInfo(@PathVariable String ip, @PathVariable int port) {
return ActionContextRetMessage.from(redisInfoManager.getInfoByHostPort(new HostPort(ip, port)));
Expand All @@ -82,6 +112,16 @@ public Map<HostPort, HealthStatusDesc> getAllHealthStatusDesc() {
else return Collections.emptyMap();
}

@GetMapping("/health/keeper/status/all")
public ConcurrentMap<String, Map<DcClusterShard, Long>> getAllKeeperFlows() {
return keeperFlowCollector.getHostPort2InputFlow();
}

@GetMapping("/health/redis/used-memory/all")
public ConcurrentMap<DcClusterShard, Long> getAllDclusterShardUsedMemory() {
return redisUsedMemoryCollector.getDcClusterShardUsedMemory();
}

private HealthCheckInstanceModel buildHealthCheckInfo(HealthCheckInstance<?> instance) {
HealthCheckInstanceModel model = new HealthCheckInstanceModel(instance.toString());
for(HealthCheckAction action : instance.getHealthCheckActions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public interface HealthCheckInstanceManager {

RedisHealthCheckInstance findRedisHealthCheckInstance(HostPort hostPort);

RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort);

KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort);

ClusterHealthCheckInstance findClusterHealthCheckInstance(String clusterId);
Expand All @@ -32,7 +34,7 @@ public interface HealthCheckInstanceManager {

KeeperHealthCheckInstance removeKeeper(HostPort hostPort);

RedisHealthCheckInstance removeRedisOnlyForUsedMemory(HostPort hostPort);
RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort);

ClusterHealthCheckInstance remove(String cluster);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public RedisHealthCheckInstance findRedisHealthCheckInstance(HostPort hostPort)
return instances.get(hostPort);
}

@Override
public RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort) {
return redisInstanceForAssignedAction.get(hostPort);
}

@Override
public KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort) {
return keeperInstances.get(hostPort);
Expand All @@ -114,7 +119,7 @@ public KeeperHealthCheckInstance removeKeeper(HostPort hostPort) {
}

@Override
public RedisHealthCheckInstance removeRedisOnlyForUsedMemory(HostPort hostPort) {
public RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort) {
RedisHealthCheckInstance instance = redisInstanceForAssignedAction.remove(hostPort);
if (null != instance) instanceFactory.remove(instance);
return instance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,30 @@
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.lifecycle.AbstractLifecycle;
import com.ctrip.xpipe.lifecycle.LifecycleHelper;
import com.ctrip.xpipe.redis.checker.CheckerConsoleService;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager;
import com.ctrip.xpipe.redis.checker.healthcheck.HealthChecker;
import com.ctrip.xpipe.redis.checker.healthcheck.meta.MetaChangeManager;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.redis.core.meta.KeeperContainerDetailInfo;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.StringUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.xml.sax.SAXException;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;

Expand All @@ -44,6 +52,9 @@ public class DefaultHealthChecker extends AbstractLifecycle implements HealthChe
@Autowired
private CheckerConfig checkerConfig;

@Autowired
private CheckerConsoleService checkerConsoleService;

@Resource(name = SCHEDULED_EXECUTOR)
private ScheduledExecutorService scheduled;

Expand Down Expand Up @@ -113,6 +124,16 @@ void generateHealthCheckInstances() {
if(checkerConfig.getIgnoredHealthCheckDc().contains(dcMeta.getId())) {
continue;
}

if (currentDcId.equalsIgnoreCase(dcMeta.getId())) {
Map<Long, KeeperContainerDetailInfo> keeperContainerDetailInfoMap
= getAllKeeperContainerDetailInfoFromDcMeta(dcMeta, getCurrentDcMeta(dcMeta.getId()));

keeperContainerDetailInfoMap.values().forEach(keeperContainerDetailInfo -> {
generateHealthCheckInstances(keeperContainerDetailInfo);
});
}

for(ClusterMeta cluster : dcMeta.getClusters().values()) {

ClusterType clusterType = ClusterType.lookup(cluster.getType());
Expand All @@ -131,6 +152,15 @@ void generateHealthCheckInstances() {
}
}

private void generateHealthCheckInstances(KeeperContainerDetailInfo keeperContainerDetailInfo) {
for (KeeperMeta keeperMeta : keeperContainerDetailInfo.getKeeperInstances()) {
instanceManager.getOrCreate(keeperMeta);
}
for (RedisMeta redisMeta : keeperContainerDetailInfo.getRedisInstances()) {
instanceManager.getOrCreateRedisInstanceForAssignedAction(redisMeta);
}
}

void generateHealthCheckInstances(ClusterMeta clusterMeta){
for(ShardMeta shard : clusterMeta.getShards().values()) {
for(RedisMeta redis : shard.getRedises()) {
Expand All @@ -156,6 +186,44 @@ private boolean isClusterInCurrentIdc(ClusterMeta cluster) {
return false;
}

private DcMeta getCurrentDcMeta(String dcId) {
try {
return checkerConsoleService.getXpipeAllDCMeta(checkerConfig.getConsoleAddress(), dcId)
.getDcs().get(dcId);
} catch (SAXException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}

private Map<Long, KeeperContainerDetailInfo> getAllKeeperContainerDetailInfoFromDcMeta(DcMeta dcMeta, DcMeta allDcMeta) {
Map<Long, KeeperContainerDetailInfo> map = dcMeta.getKeeperContainers().stream()
.collect(Collectors.toMap(KeeperContainerMeta::getId,
keeperContainerMeta -> new KeeperContainerDetailInfo(keeperContainerMeta, new ArrayList<>(), new ArrayList<>())));
allDcMeta.getClusters().values().forEach(clusterMeta -> {
for (ShardMeta shardMeta : clusterMeta.getAllShards().values()){
if (shardMeta.getKeepers() == null || shardMeta.getKeepers().isEmpty()) continue;
RedisMeta monitorRedis = getMonitorRedisMeta(shardMeta.getRedises());
shardMeta.getKeepers().forEach(keeperMeta -> {
if (map.containsKey(keeperMeta.getKeeperContainerId())) {
map.get(keeperMeta.getKeeperContainerId()).getKeeperInstances().add(keeperMeta);
map.get(keeperMeta.getKeeperContainerId()).getRedisInstances().add(monitorRedis);
}
});
}
});

return map;
}

private RedisMeta getMonitorRedisMeta(List<RedisMeta> redisMetas) {
if (redisMetas == null || redisMetas.isEmpty()) return null;
return redisMetas.stream().sorted((r1, r2) -> (r1.getIp().hashCode() - r2.getIp().hashCode()))
.collect(Collectors.toList()).get(0);
}

private boolean clusterDcIsCurrentDc(ClusterMeta clusterMeta) {
return clusterMeta.parent().getId().equalsIgnoreCase(currentDcId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private void addKeeper(KeeperMeta added) {
}

private void removeRedisOnlyForUsedMemory(RedisMeta removed) {
if (null != instanceManager.removeRedisOnlyForUsedMemory(new HostPort(removed.getIp(), removed.getPort()))) {
if (null != instanceManager.removeRedisInstanceForAssignedAction(new HostPort(removed.getIp(), removed.getPort()))) {
logger.info("[removeRedisOnlyForUsedMemory][{}:{}] {}", removed.getIp(), removed.getPort(), removed);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public void testKeeperChange() throws Exception {
// only change keeper reload
Mockito.verify(instanceManager, times(2)).removeKeeper(any(HostPort.class));
Mockito.verify(instanceManager, times(1)).getOrCreate(any(KeeperMeta.class));
Mockito.verify(instanceManager, times(0)).removeRedisOnlyForUsedMemory(any(HostPort.class));
Mockito.verify(instanceManager, times(0)).removeRedisInstanceForAssignedAction(any(HostPort.class));
Mockito.verify(instanceManager, times(0)).getOrCreateRedisInstanceForAssignedAction(any(RedisMeta.class));

}
Expand All @@ -529,7 +529,7 @@ public void testRedisChange() throws Exception {
// only change redis changed
Mockito.verify(instanceManager, times(0)).removeKeeper(any(HostPort.class));
Mockito.verify(instanceManager, times(0)).getOrCreate(any(KeeperMeta.class));
Mockito.verify(instanceManager, times(1)).removeRedisOnlyForUsedMemory(any(HostPort.class));
Mockito.verify(instanceManager, times(1)).removeRedisInstanceForAssignedAction(any(HostPort.class));
Mockito.verify(instanceManager, times(1)).getOrCreateRedisInstanceForAssignedAction(any(RedisMeta.class));

}
Expand Down

0 comments on commit da09bf3

Please sign in to comment.