Skip to content

Commit

Permalink
check invalid keepers before reset (#795)
Browse files Browse the repository at this point in the history
* check invalid keepers before reset

* rename method

* not reset if keeper unreachable

---------

Co-authored-by: llj李龙姣 <[email protected]>
  • Loading branch information
lilongjiao and llj李龙姣 authored May 11, 2024
1 parent d59110d commit 5beb801
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,15 @@
import com.ctrip.xpipe.redis.core.protocal.cmd.RoleCommand;
import com.ctrip.xpipe.redis.core.protocal.pojo.Role;
import com.ctrip.xpipe.redis.core.protocal.pojo.Sentinel;
import com.ctrip.xpipe.redis.core.protocal.pojo.SlaveRole;
import com.ctrip.xpipe.tuple.Pair;
import com.ctrip.xpipe.utils.ObjectUtils;

import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE;

Expand Down Expand Up @@ -106,12 +102,55 @@ Pair<Boolean, String> tooManyKeepers(List<HostPort> slaves, String clusterId, St
}
}
slaves.removeAll(keepers);
if (keepers.size() > 1)
if (keepers.size() > 1 && needReset(keepers))
return new Pair<>(true, String.format("%s,%s, has %d keepers:%s", clusterId, shardId, keepers.size(), keepers));
else
return new Pair<>(false, null);
}

private boolean needReset(Set<HostPort> keepers) {

Map<HostPort, SlaveRole> keeperRoles = new ConcurrentHashMap<>();

ParallelCommandChain keeperRoleChain = new ParallelCommandChain();
for (HostPort hostPort : keepers) {
RoleCommand roleCommand = new RoleCommand(keyedObjectPool.getKeyPool(new DefaultEndPoint(hostPort.getHost(), hostPort.getPort())), scheduled);
roleCommand.future().addListener(future -> {
if (future.isSuccess()) {
Role role = future.get();
if (role instanceof SlaveRole) {
keeperRoles.put(hostPort, (SlaveRole) role);
}
}
});
keeperRoleChain.add(roleCommand);
}

try {
keeperRoleChain.execute().get(1500, TimeUnit.MILLISECONDS);
} catch (Throwable th) {
logger.warn("[{}-{}+{}]parallel role command to keepers error", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), th);
}

if (keeperRoles.isEmpty()) {
logger.warn("[{}-{}+{}]get role of all keepers failed, keeper status unknown", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId());
return false;
}

if (keeperRoles.size() < keepers.size()) {
logger.warn("[{}-{}+{}]get role of keepers:{}, all keepers:{}, some keepers unreachable", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), keeperRoles, keepers);
return false;
}

Set<HostPort> keeperMasters = keeperRoles.values().stream().map(slaveRole -> new HostPort(slaveRole.getMasterHost(), slaveRole.getMasterPort())).collect(Collectors.toSet());
if (keeperMasters.size() > 1) {
logger.warn("[{}-{}+{}]keeper master not unique:{}, need reset", LOG_TITLE, context.getInfo().getClusterId(), context.getInfo().getShardId(), keeperRoles);
return true;
}

return false;
}

Pair<Boolean, String> inOtherClusterShard(List<HostPort> slaves, String clusterId, String shardId) {
for (HostPort currentSlave : slaves) {
Pair<String, String> clusterShard = metaCache.findClusterShard(currentSlave);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,63 @@ public void init() throws Exception {
}

@Test
public void testOneWayReset() throws Exception {
public void testTooManyKeepers() throws Exception{
RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort());
resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()));

// sentinelManager.slaves
when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)));
when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001), new HostPort(LOCAL_HOST, 8002)));

// 1、command failed
Pair<Boolean, String> shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard");
Assert.assertFalse(shouldResetAndReason.getKey());

// 2、some keepers unreachable
Server activeKeeper0 = startServer(8000,"*5\r\n"
+ "$6\r\nkeeper\r\n"
+ "$9\r\nlocalhost\r\n"
+ ":6379\r\n"
+ "$9\r\nconnected\r\n"
+ ":477\r\n");
shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard");
Assert.assertFalse(shouldResetAndReason.getKey());

// 3、keeper master not unique
Server activeKeeper1 = startServer(8001,"*5\r\n"
+ "$6\r\nkeeper\r\n"
+ "$10\r\nlocalhost2\r\n"
+ ":6379\r\n"
+ "$9\r\nconnected\r\n"
+ ":477\r\n");
shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)), "cluster", "shard");
Assert.assertTrue(shouldResetAndReason.getKey());
Assert.assertTrue(shouldResetAndReason.getValue().contains("has 2 keepers"));

// 4、keeper master unique
activeKeeper1.stop();
Server activeKeeper2 = startServer(8002,"*5\r\n"
+ "$6\r\nkeeper\r\n"
+ "$9\r\nlocalhost\r\n"
+ ":6379\r\n"
+ "$9\r\nconnected\r\n"
+ ":477\r\n");
shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8002)), "cluster", "shard");
Assert.assertFalse(shouldResetAndReason.getKey());
activeKeeper2.stop();
activeKeeper0.stop();
}

@Test
public void testOneWayReset() throws Exception {
RedisHealthCheckInstance instance = newRandomRedisHealthCheckInstance("currentDc", "activeDc", randomPort());
resetSentinels.setContext(new SentinelHelloCollectContext().setInfo(instance.getCheckInfo()));

// sentinelManager.slaves
when(metaCache.getAllKeepers()).thenReturn(Sets.newHashSet(new HostPort(LOCAL_HOST, 8000), new HostPort(LOCAL_HOST, 8001)));

HostPort wrongSlave = new HostPort("otherClusterShardSlave", 6379);
when(metaCache.findClusterShard(wrongSlave)).thenReturn(new Pair<>("otherCluster", "otherShard"));
shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), wrongSlave), "cluster", "shard");
Pair<Boolean, String> shouldResetAndReason = resetSentinels.shouldReset(Lists.newArrayList(new HostPort(LOCAL_HOST, 8000), wrongSlave), "cluster", "shard");
Assert.assertTrue(shouldResetAndReason.getKey());
Assert.assertTrue(shouldResetAndReason.getValue().contains("but meta:otherCluster:otherShard"));

Expand Down

0 comments on commit 5beb801

Please sign in to comment.