Skip to content

Commit

Permalink
check role for all unexpected sentinel slaves (#796)
Browse files Browse the repository at this point in the history
Co-authored-by: llj李龙姣 <[email protected]>
  • Loading branch information
lilongjiao and llj李龙姣 authored May 14, 2024
1 parent 5beb801 commit e0eea9e
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void go() throws Exception {
chain.add(new AnalyseHellos(context, checkerConfig));
chain.add(new AcquireLeakyBucket(context, leakyBucket));
chain.add(new DeleteSentinels(context, sentinelManager));
chain.add(new ResetSentinels(context, metaCache, alertManager, keyedObjectPool, scheduled, resetExecutor, sentinelManager));
chain.add(new ResetSentinels(context, metaCache, keyedObjectPool, scheduled, resetExecutor, sentinelManager));
chain.add(new AddSentinels(context, sentinelManager, checkerConfig));
chain.add(new SetSentinels(context, sentinelManager));
chain.execute().addListener(commandFuture -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ protected void doExecute() throws Throwable {

ParallelCommandChain addChain = new ParallelCommandChain(MoreExecutors.directExecutor(), false);
context.getToAdd().forEach((hello) -> {
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Add", hello.toString());
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Add", hello.getMonitorName());
HostPort sentinelAddr = hello.getSentinelAddr();
Sentinel sentinel = new Sentinel(sentinelAddr.toString(), sentinelAddr.getHost(), sentinelAddr.getPort());
addChain.add(sentinelManager.monitorMaster(sentinel, hello.getMonitorName(), hello.getMasterAddr(), checkerConfig.getDefaultSentinelQuorumConfig().getQuorum()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected void doExecute() throws Throwable {
ParallelCommandChain deleteChain = new ParallelCommandChain(MoreExecutors.directExecutor(), false);

context.getToDelete().forEach((hello -> {
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Remove", hello.toString());
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Remove", hello.getMonitorName());
HostPort sentinelAddr = hello.getSentinelAddr();
deleteChain.add(sentinelManager.removeSentinelMonitor(new Sentinel(sentinelAddr.toString(), sentinelAddr.getHost(), sentinelAddr.getPort()), hello.getMonitorName()));
}));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.collector.command;

import com.ctrip.xpipe.api.command.CommandFuture;
import com.ctrip.xpipe.api.server.Server;
import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.command.CommandExecutionException;
import com.ctrip.xpipe.command.CommandTimeoutException;
import com.ctrip.xpipe.command.ParallelCommandChain;
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.monitor.CatEventMonitor;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.SentinelManager;
import com.ctrip.xpipe.redis.checker.alert.ALERT_TYPE;
import com.ctrip.xpipe.redis.checker.alert.AlertManager;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHello;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand;
Expand All @@ -22,29 +17,24 @@
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.ObjectUtils;

import java.net.SocketException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE;

public class ResetSentinels extends AbstractSentinelHelloCollectCommand {

private SentinelManager sentinelManager;
private MetaCache metaCache;
private AlertManager alertManager;
private XpipeNettyClientKeyedObjectPool keyedObjectPool;
private ScheduledExecutorService scheduled;
private ExecutorService resetExecutor;

public ResetSentinels(SentinelHelloCollectContext context, MetaCache metaCache,
AlertManager alertManager, XpipeNettyClientKeyedObjectPool keyedObjectPool,
XpipeNettyClientKeyedObjectPool keyedObjectPool,
ScheduledExecutorService scheduled, ExecutorService resetExecutor,SentinelManager sentinelManager) {
super(context);
this.metaCache = metaCache;
this.alertManager = alertManager;
this.keyedObjectPool = keyedObjectPool;
this.scheduled = scheduled;
this.resetExecutor = resetExecutor;
Expand Down Expand Up @@ -77,134 +67,87 @@ public void run() {
});
}

Pair<Boolean, String> shouldReset(List<HostPort> slaves, String clusterId, String shardId) {
if (context.getInfo().getClusterType().supportKeeper()) {
Pair<Boolean, String> tooManyKeepers = tooManyKeepers(slaves, clusterId, shardId);
if (tooManyKeepers.getKey()) return tooManyKeepers;
}

Pair<Boolean, String> inOtherClusterShard = inOtherClusterShard(slaves, clusterId, shardId);
if (inOtherClusterShard.getKey()) return inOtherClusterShard;

Pair<Boolean, String> unknownInstanceAndIsKeeperOrDead = redundantInstances(slaves, clusterId, shardId);
if (unknownInstanceAndIsKeeperOrDead.getKey()) return unknownInstanceAndIsKeeperOrDead;

return new Pair<>(false, null);
}
private Set<HostPort> tooManyKeepers(List<HostPort> slaves) {
if (!context.getInfo().getClusterType().supportKeeper())
return new HashSet<>();

Pair<Boolean, String> tooManyKeepers(List<HostPort> slaves, String clusterId, String shardId) {
Set<HostPort> allKeepers = metaCache.getAllKeepers();
Set<HostPort> keepers = new HashSet<>();
for (HostPort currentSlave : slaves) {

if (allKeepers.contains(currentSlave)) {
keepers.add(currentSlave);
}
}
slaves.removeAll(keepers);
if (keepers.size() > 1 && needReset(keepers))
return new Pair<>(true, String.format("%s,%s, has %d keepers:%s", clusterId, shardId, keepers.size(), keepers));
else
return new Pair<>(false, null);
if (keepers.size() > EXPECTED_KEEPER_COUNT) return keepers;
return new HashSet<>();
}

private boolean needReset(Set<HostPort> keepers) {

Map<HostPort, SlaveRole> keeperRoles = new ConcurrentHashMap<>();
private Set<HostPort> unknownInstances(List<HostPort> slaves, String clusterId, String shardId){
Set<HostPort> unknownInstances = new HashSet<>();
for (HostPort currentSlave : slaves) {
Pair<String, String> clusterShard = metaCache.findClusterShard(currentSlave);
if (clusterShard == null || !ObjectUtils.equals(clusterId, clusterShard.getKey()) || !ObjectUtils.equals(shardId, clusterShard.getValue()))
unknownInstances.add(currentSlave);
}
return unknownInstances;
}

ParallelCommandChain keeperRoleChain = new ParallelCommandChain();
for (HostPort hostPort : keepers) {
private Set<HostPort> connectedToTrueMaster(Set<HostPort> invalidSlaves) {
Map<HostPort, SlaveRole> connectedSlaves = new ConcurrentHashMap<>();

ParallelCommandChain slaveRoleChain = new ParallelCommandChain();
for (HostPort hostPort : invalidSlaves) {
RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(hostPort.getHost(), hostPort.getPort())), scheduled);
roleCommand.future().addListener(future -> {
if (future.isSuccess()) {
Role role = future.get();
if (role instanceof SlaveRole) {
keeperRoles.put(hostPort, (SlaveRole) role);
SlaveRole slaveRole = (SlaveRole) role;
HostPort trueMaster = context.getTrueMasterInfo().getKey();
if (slaveRole.getMasterHost().equals(trueMaster.getHost()) && slaveRole.getMasterPort() == trueMaster.getPort()) {
connectedSlaves.put(hostPort, slaveRole);
}
}
}
});
keeperRoleChain.add(roleCommand);
slaveRoleChain.add(roleCommand);
}

try {
keeperRoleChain.execute().get(1500, TimeUnit.MILLISECONDS);
slaveRoleChain.execute().get(1500, TimeUnit.MILLISECONDS);
} catch (Throwable th) {
logger.warn("[{}-{}+{}]parallel role command to keepers error", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th);
}

if (keeperRoles.isEmpty()) {
logger.warn("[{}-{}+{}]get role of all keepers failed, keeper status unknown", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId());
return false;
}

if (keeperRoles.size() < keepers.size()) {
logger.warn("[{}-{}+{}]get role of keepers:{}, all keepers:{}, some keepers unreachable", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), keeperRoles, keepers);
return false;
logger.warn("[{}-{}+{}]parallel role command to slaves error", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th);
}

Set<HostPort> keeperMasters = keeperRoles.values().stream().map(slaveRole -> new HostPort(slaveRole.getMasterHost(), slaveRole.getMasterPort())).collect(Collectors.toSet());
if (keeperMasters.size() > 1) {
logger.warn("[{}-{}+{}]keeper master not unique:{}, need reset", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), keeperRoles);
return true;
}

return false;
}

Pair<Boolean, String> inOtherClusterShard(List<HostPort> slaves, String clusterId, String shardId) {
for (HostPort currentSlave : slaves) {
Pair<String, String> clusterShard = metaCache.findClusterShard(currentSlave);
if (clusterShard != null) {
if (!ObjectUtils.equals(clusterId, clusterShard.getKey()) || !ObjectUtils.equals(shardId, clusterShard.getValue()))
return new Pair<>(true, String.format("[%s], current:%s,%s, but meta:%s:%s", currentSlave, clusterId, shardId, clusterShard.getKey(), clusterShard.getValue()));
}
}
return new Pair<>(false, null);
return connectedSlaves.keySet();
}


Pair<Boolean, String> redundantInstances(List<HostPort> slaves, String clusterId, String shardId) {
for (HostPort currentSlave : slaves) {
Pair<String, String> clusterShard = metaCache.findClusterShard(currentSlave);
if (clusterShard == null) {
if (redundantInstance(currentSlave))
return new Pair<>(true, String.format("[%s]keeper or master, current:%s,%s, with no cluster shard", currentSlave, clusterId, shardId));
else {
String message = String.format("sentinel monitors redis %s not in xpipe", currentSlave.toString());
alertManager.alert(clusterId, shardId, currentSlave, ALERT_TYPE.SENTINEL_MONITOR_REDUNDANT_REDIS, message);
}
}
private static final int EXPECTED_KEEPER_COUNT = 1;
boolean shouldReset(List<HostPort> slaves, String clusterId, String shardId, String sentinelMonitorName, HostPort sentinelAddr) {
Set<HostPort> toManyKeepers = tooManyKeepers(slaves);
if (shouldResetTooManyKeepers(toManyKeepers)) {
logger.info("[{}-{}+{}][reset]{}, {}, too many keepers: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, toManyKeepers);
return true;
}
return new Pair<>(false, null);
}

boolean redundantInstance(HostPort hostPort) {
AtomicBoolean redundant = new AtomicBoolean(false);
RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(hostPort.getHost(), hostPort.getPort())), scheduled);
try {
Role role = roleCommand.execute().get(1, TimeUnit.SECONDS);
redundant.set(isKeeper(role) || isMaster(role));
logger.debug("[redundantInstance] role: {}", role.getServerRole().name());
} catch (Throwable th) {
redundant.set(inactive(th.getCause()));
logger.warn("[redundantInstance][failed]{}", hostPort, th);
Set<HostPort> unknownSlaves = unknownInstances(slaves, clusterId, shardId);
if (shouldResetUnknownInstances(unknownSlaves)) {
logger.info("[{}-{}+{}][reset]{}, {}, unknown slaves: {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, unknownSlaves);
return true;
}

return redundant.get();
}

boolean isKeeper(Role role) {
return Server.SERVER_ROLE.KEEPER.equals(role.getServerRole());
return false;
}

boolean isMaster(Role role) {
return Server.SERVER_ROLE.MASTER.equals(role.getServerRole());
private boolean shouldResetTooManyKeepers(Set<HostPort> toManyKeepers) {
return !toManyKeepers.isEmpty() && connectedToTrueMaster(toManyKeepers).size() <= EXPECTED_KEEPER_COUNT;
}

boolean inactive(Throwable th) {
return th instanceof CommandExecutionException ||
th instanceof CommandTimeoutException ||
th instanceof SocketException;
private boolean shouldResetUnknownInstances(Set<HostPort> unknownSlaves) {
return !unknownSlaves.isEmpty() && connectedToTrueMaster(unknownSlaves).isEmpty();
}

class ResetSentinel extends AbstractCommand<Void> {
Expand Down Expand Up @@ -239,11 +182,8 @@ protected void doExecute() throws Throwable {
if (slaves.isEmpty())
return;

Pair<Boolean, String> shouldResetAndReason = shouldReset(slaves, clusterId, shardId);

if (shouldResetAndReason.getKey()) {
logger.info("[{}-{}+{}][reset]{}, {}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, shouldResetAndReason.getValue());
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", String.format("%s,%s", sentinelAddr, shouldResetAndReason.getValue()));
if (shouldReset(slaves, clusterId, shardId, sentinelMonitorName, sentinelAddr)) {
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.Reset", sentinelMonitorName);
sentinelManager.reset(sentinel, sentinelMonitorName).execute().getOrHandle(1000, TimeUnit.MILLISECONDS, throwable -> {
logger.error("[{}-{}+{}][reset]{}, {}", LOG_TITLE, clusterId, shardId, sentinelMonitorName, sentinelAddr, throwable);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ public Pair<HostPort, List<HostPort>> getTrueMasterInfo() {
return trueMasterInfo;
}

public void setTrueMasterInfo(Pair<HostPort, List<HostPort>> trueMasterInfo) {
public SentinelHelloCollectContext setTrueMasterInfo(Pair<HostPort, List<HostPort>> trueMasterInfo) {
this.trueMasterInfo = trueMasterInfo;
return this;
}

public Set<SentinelHello> getToDelete() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected void doExecute() throws Throwable {
} else {
ParallelCommandChain setChain = new ParallelCommandChain(MoreExecutors.directExecutor(), false);
context.getToAdd().forEach((hello) -> {
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.SentinelSet", hello.toString());
CatEventMonitor.DEFAULT.logEvent("Sentinel.Hello.Collector.SentinelSet", hello.getMonitorName());
HostPort sentinelAddr = hello.getSentinelAddr();
Sentinel sentinel = new Sentinel(sentinelAddr.toString(), sentinelAddr.getHost(), sentinelAddr.getPort());
setChain.add(sentinelManager.sentinelSet(sentinel, context.getSentinelMonitorName(), sentinelConfigs));
Expand Down
Loading

0 comments on commit e0eea9e

Please sign in to comment.