Skip to content

Commit

Permalink
Feature/rebalance keeper (#744)
Browse files Browse the repository at this point in the history
* add keeper container comparator

* add keeper instance in check action

* add keeper info stats action

* get all dc meta from console

* add info action for redis

* add redis instance for used memory

* add redis info action when leader changes

* temp

* auto migrate keepers

* add overload migration pages

* fix keeper cotainer meta comparator test fail

* fix DefaultDcMetaChangeManagerTest fail

* add test for keeper info stats action

* add test for redisInfoAction

* add test for keeperContainerMetaComparator

* temp

* temp

* add test for DefaultKeeperContainerUsedInfoAnalyzer

* add test for AutoMigrateOverloadKeeperContainerAction

* add keeper session manager

* fix bean circle

* add keeper related action when start

* fix checker not report keeper result

* add log and update choose redis strategy

* add keeper session manager

* adjust interval of keeper related actions

* add alert when keeper migrate

* clean expired keeper container used info

* update session manager

* update all dc meta of current dc with metacache
  • Loading branch information
songyuyuyu authored Nov 20, 2023
1 parent 42109dd commit c346720
Show file tree
Hide file tree
Showing 139 changed files with 5,691 additions and 515 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ hs_err_pid*

# npm
node_modules
*.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.model.CheckerStatus;
import com.ctrip.xpipe.redis.checker.model.HealthCheckResult;
import com.ctrip.xpipe.redis.checker.model.KeeperContainerUsedInfoModel;
import com.ctrip.xpipe.redis.checker.model.ProxyTunnelInfo;
import com.ctrip.xpipe.redis.core.entity.SentinelMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
Expand All @@ -25,12 +26,16 @@ public interface CheckerConsoleService {
XpipeMeta getXpipeMeta(String console, int clusterPartIndex) throws SAXException, IOException;

XpipeMeta getXpipeAllMeta(String console) throws SAXException, IOException;

XpipeMeta getXpipeDcAllMeta(String console, String dcName) throws SAXException, IOException;

List<ProxyTunnelInfo> getProxyTunnelInfos(String console);

void ack(String console, CheckerStatus checkerStatus);

void report(String console, HealthCheckResult result);

void reportKeeperContainerInfo(String console, List<KeeperContainerUsedInfoModel> keeperContainerUsedInfoModels, int index);

boolean isClusterOnMigration(String console, String clusterId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,41 @@ public boolean reportRecovery() {
public DetailDesc detailDesc() {
return new DetailDesc("keepers should in different available zones", "keepers in the same available zone found");
}
},
KEEPER_MIGRATION_SUCCESS("keeper migration success", EMAIL_XPIPE_ADMIN) {
@Override
public boolean urgent() {
return false;
}

@Override
public boolean reportRecovery() {
return false;
}

@Override
public DetailDesc detailDesc() {
return new DetailDesc("keeper migration success", "keeper migration success");
}
},
KEEPER_MIGRATION_FAIL("keeper migration fail", EMAIL_XPIPE_ADMIN) {
@Override
public boolean urgent() {
return false;
}

@Override
public boolean reportRecovery() {
return false;
}

@Override
public DetailDesc detailDesc() {
return new DetailDesc("keeper migration fail", "keeper migration fail");
}
};


private String simpleDesc;

private int alertMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public interface CheckerConfig {

String KEY_CHECKER_META_REFRESH_INTERVAL = "checker.meta.refresh.interval.milli";

String KEY_CHECKER_CURRENT_DC_ALL_META_REFRESH_INTERVAL = "checker.current_dc_all_meta.refresh.interval.milli";

String KEY_CONSOLE_ADDRESS = "console.address";

String KEY_CHECKER_ACK_INTERVAL = "checker.ack.interval.milli";
Expand All @@ -101,8 +103,12 @@ public interface CheckerConfig {

String KEY_SUBSCRIBE_TIMEOUT_MILLI = "checker.subscribe.timeout.milli";

String KEY_KEEPER_CHECKER_INTERVAL = "keeper.checker.interval";

int getRedisReplicationHealthCheckInterval();

int getCheckerCurrentDcAllMetaRefreshIntervalMilli();

int getClusterHealthCheckInterval();

int getDownAfterCheckNums();
Expand Down Expand Up @@ -201,4 +207,6 @@ public interface CheckerConfig {

Set<String> getMigrationUnsupportedClusters();

int getKeeperCheckerIntervalMilli();

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.DefaultDelayPingActionCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HEALTH_STATE;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.HealthStatusDesc;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info.RedisUsedMemoryCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.infoStats.KeeperFlowCollector;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisconf.AbstractRedisConfigRuleAction;
import com.ctrip.xpipe.redis.checker.healthcheck.stability.StabilityHolder;
import com.ctrip.xpipe.redis.checker.model.DcClusterShard;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
Expand All @@ -18,6 +21,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* @author lishanglin
Expand All @@ -31,6 +35,12 @@ public class CheckerHealthController {
@Autowired
private DefaultDelayPingActionCollector defaultDelayPingActionCollector;

@Autowired
private RedisUsedMemoryCollector redisUsedMemoryCollector;

@Autowired
private KeeperFlowCollector keeperFlowCollector;

@Autowired
private HealthCheckInstanceManager instanceManager;

Expand Down Expand Up @@ -66,6 +76,26 @@ public String getClusterHealthCheckInstance(@PathVariable String clusterId) {
return Codec.DEFAULT.encode(model);
}

@RequestMapping(value = "/health/check/keeper/{ip}/{port}", method = RequestMethod.GET)
public String getHealthCheckKeeper(@PathVariable String ip, @PathVariable int port) {
KeeperHealthCheckInstance instance = instanceManager.findKeeperHealthCheckInstance(new HostPort(ip, port));
if(instance == null) {
return "Not found";
}
HealthCheckInstanceModel model = buildHealthCheckInfo(instance);
return Codec.DEFAULT.encode(model);
}

@RequestMapping(value = "/health/check/redis-for-assigned-action/{ip}/{port}", method = RequestMethod.GET)
public String getHealthCheckRedisInstanceForAssignedAction(@PathVariable String ip, @PathVariable int port) {
RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForAssignedAction(new HostPort(ip, port));
if(instance == null) {
return "Not found";
}
HealthCheckInstanceModel model = buildHealthCheckInfo(instance);
return Codec.DEFAULT.encode(model);
}

@RequestMapping(value = "/health/redis/info/{ip}/{port}", method = RequestMethod.GET)
public ActionContextRetMessage<Map<String, String>> getRedisInfo(@PathVariable String ip, @PathVariable int port) {
return ActionContextRetMessage.from(redisInfoManager.getInfoByHostPort(new HostPort(ip, port)));
Expand All @@ -82,6 +112,16 @@ public Map<HostPort, HealthStatusDesc> getAllHealthStatusDesc() {
else return Collections.emptyMap();
}

@GetMapping("/health/keeper/status/all")
public ConcurrentMap<String, Map<DcClusterShard, Long>> getAllKeeperFlows() {
return keeperFlowCollector.getHostPort2InputFlow();
}

@GetMapping("/health/redis/used-memory/all")
public ConcurrentMap<DcClusterShard, Long> getAllDclusterShardUsedMemory() {
return redisUsedMemoryCollector.getDcClusterShardUsedMemory();
}

private HealthCheckInstanceModel buildHealthCheckInfo(HealthCheckInstance<?> instance) {
HealthCheckInstanceModel model = new HealthCheckInstanceModel(instance.toString());
for(HealthCheckAction action : instance.getHealthCheckActions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void doStart() {

@Override
public void doStop() {
logger.debug("[stopped][{}][{}], listener:{}, future:{}", getClass().getSimpleName(), instance.getCheckInfo(), listeners, future);
if(future != null) {
future.cancel(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.entity.RedisMeta;

import java.util.List;
Expand All @@ -15,18 +16,34 @@ public interface HealthCheckInstanceManager {

RedisHealthCheckInstance getOrCreate(RedisMeta redis);

RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisMeta redis);

KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper);

ClusterHealthCheckInstance getOrCreate(ClusterMeta cluster);

RedisHealthCheckInstance findRedisHealthCheckInstance(HostPort hostPort);

RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort);

KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort);

ClusterHealthCheckInstance findClusterHealthCheckInstance(String clusterId);

RedisHealthCheckInstance remove(HostPort hostPort);

KeeperHealthCheckInstance removeKeeper(HostPort hostPort);

RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort);

ClusterHealthCheckInstance remove(String cluster);

List<RedisHealthCheckInstance> getAllRedisInstance();

List<KeeperHealthCheckInstance> getAllKeeperInstance();

List<RedisHealthCheckInstance> getAllRedisInstanceForAssignedAction();

List<ClusterHealthCheckInstance> getAllClusterInstance();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.ctrip.xpipe.redis.checker.healthcheck;

public interface KeeperHealthCheckActionFactory <T extends HealthCheckAction> extends HealthCheckActionFactory<T, KeeperHealthCheckInstance>{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.ctrip.xpipe.redis.checker.healthcheck;

public interface KeeperHealthCheckActionListener <T extends ActionContext> extends HealthCheckActionListener<T, HealthCheckAction<KeeperHealthCheckInstance>> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.ctrip.xpipe.redis.checker.healthcheck;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.redis.checker.healthcheck.session.RedisSession;

public interface KeeperHealthCheckInstance extends HealthCheckInstance<KeeperInstanceInfo>{

Endpoint getEndpoint();

RedisSession getRedisSession();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.ctrip.xpipe.redis.checker.healthcheck;

import com.ctrip.xpipe.endpoint.ClusterShardHostPort;
import com.ctrip.xpipe.endpoint.HostPort;


public interface KeeperInstanceInfo extends CheckInfo {

ClusterShardHostPort getClusterShardHostport();

String getShardId();

String getDcId();

boolean isActive();

HostPort getHostPort();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.ctrip.xpipe.redis.checker.healthcheck;

public interface KeeperSupport {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper;

import com.ctrip.xpipe.redis.checker.healthcheck.AbstractActionContext;
import com.ctrip.xpipe.redis.checker.healthcheck.KeeperHealthCheckInstance;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;
import org.slf4j.Logger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public abstract class AbstractKeeperInfoCommand<T extends AbstractActionContext> extends KeeperStatsCheckAction<String, InfoResultExtractor> {

public AbstractKeeperInfoCommand(ScheduledExecutorService scheduled, KeeperHealthCheckInstance instance, ExecutorService executors) {
super(scheduled, instance, executors);
}

@Override
protected Logger getHealthCheckLogger() {
return logger;
}

@Override
protected T generateActionContext(String result) {
return createActionContext(result);
}

protected abstract T createActionContext(String extractor);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper;

import com.ctrip.xpipe.redis.checker.healthcheck.KeeperHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisstats.AbstractInfoListener;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.AbstractKeeperLeaderAwareHealthCheckActionFactory;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.SiteLeaderAwareHealthCheckAction;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

public abstract class AbstractKeeperInfoCommandActionFactory<T extends AbstractInfoListener, C extends AbstractKeeperInfoCommand>
extends AbstractKeeperLeaderAwareHealthCheckActionFactory {
@Autowired
private List<T> listeners;

@Override
public SiteLeaderAwareHealthCheckAction create(KeeperHealthCheckInstance instance) {
C action = createAction(instance);
action.addListeners(listeners);
return action;
}

protected abstract C createAction(KeeperHealthCheckInstance instance);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper;

import com.ctrip.xpipe.redis.checker.healthcheck.AbstractActionContext;
import com.ctrip.xpipe.redis.checker.healthcheck.KeeperHealthCheckInstance;
import com.ctrip.xpipe.redis.core.protocal.cmd.InfoResultExtractor;

public abstract class AbstractKeeperInfoContext extends AbstractActionContext<InfoResultExtractor, KeeperHealthCheckInstance> {
public AbstractKeeperInfoContext(KeeperHealthCheckInstance instance, InfoResultExtractor infoResultExtractor) {
super(instance, infoResultExtractor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper;

import com.ctrip.xpipe.redis.checker.healthcheck.KeeperHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisstats.AbstractInstanceStatsCheckAction;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public abstract class KeeperStatsCheckAction<T, K> extends AbstractInstanceStatsCheckAction<T, K , KeeperHealthCheckInstance> {

public KeeperStatsCheckAction(ScheduledExecutorService scheduled, KeeperHealthCheckInstance instance, ExecutorService executors) {
super(scheduled, instance, executors);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.keeper.info;

import com.ctrip.xpipe.api.command.CommandFuture;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.redisstats.AbstractInfoCommandAction;
import com.ctrip.xpipe.redis.checker.healthcheck.session.Callbackable;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public class RedisInfoAction extends AbstractInfoCommandAction<RedisInfoActionContext> {

public RedisInfoAction(ScheduledExecutorService scheduled, RedisHealthCheckInstance instance, ExecutorService executors) {
super(scheduled, instance, executors);
}

@Override
protected RedisInfoActionContext createActionContext(String extractor) {
return new RedisInfoActionContext(instance, extractor);
}

@Override
protected CommandFuture<String> executeRedisCommandForStats(Callbackable<String> callback) {
return getActionInstance().getRedisSession().info("", callback);
}

@Override
protected int getBaseCheckInterval() {
return getActionInstance().getHealthCheckConfig().getKeeperCheckerIntervalMilli();
}
}
Loading

0 comments on commit c346720

Please sign in to comment.