Skip to content

Commit

Permalink
Merge pull request #884 from ctripcorp/feature/instance_check
Browse files Browse the repository at this point in the history
check health instances missing
  • Loading branch information
LanternLee authored Sep 19, 2024
2 parents 8f2c87b + e3b772e commit b42baac
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.entity.RedisMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;

import java.util.List;

Expand Down Expand Up @@ -52,4 +53,6 @@ public interface HealthCheckInstanceManager {

List<ClusterHealthCheckInstance> getAllClusterInstance();

boolean checkInstancesMiss(XpipeMeta xpipeMeta);

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.ctrip.xpipe.redis.checker.healthcheck.impl;

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.api.monitor.EventMonitor;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.healthcheck.*;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.entity.RedisMeta;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.utils.MapUtils;
import com.ctrip.xpipe.utils.StringUtil;
import com.google.common.collect.Lists;
Expand All @@ -14,7 +15,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentMap;

/**
Expand All @@ -37,6 +38,8 @@ public class DefaultHealthCheckInstanceManager implements HealthCheckInstanceMan

private ConcurrentMap<HostPort, RedisHealthCheckInstance> redisInstanceForPingAction = Maps.newConcurrentMap();

private static final String ALERT_TYPE = "HealthCheckInstance";

@Autowired
private HealthCheckInstanceFactory instanceFactory;

Expand Down Expand Up @@ -179,4 +182,88 @@ public List<ClusterHealthCheckInstance> getAllClusterInstance() {
return Lists.newLinkedList(clusterHealthCheckerInstances.values());
}

public boolean checkInstancesMiss(XpipeMeta xpipeMeta) {
if (null == xpipeMeta) return true;

logger.debug("[checkInstancesMiss][begin]");
Set<String> currentClusters = clusterHealthCheckerInstances.keySet();
Set<HostPort> currentInstances = instances.keySet();
Set<HostPort> currentPingInstances = redisInstanceForPingAction.keySet();

Set<String> expectClusters = new HashSet<>();
Set<HostPort> expectInstances = new HashSet<>();
Set<HostPort> expectPingInstances = new HashSet<>();

String currentDc = FoundationService.DEFAULT.getDataCenter();
String currentZone = xpipeMeta.getDcs().get(currentDc).getZone();
for (Map.Entry<String, DcMeta> entry: xpipeMeta.getDcs().entrySet()) {
String dcId = entry.getKey();
DcMeta dcMeta = entry.getValue();
for (ClusterMeta clusterMeta: dcMeta.getClusters().values()) {
ClusterType clusterType = ClusterType.lookup(clusterMeta.getType());
String clusterId = clusterMeta.getId().toLowerCase();
boolean addInstanceForPing = false;
if (clusterType.equals(ClusterType.ONE_WAY)) {
String activeDc = clusterMeta.getActiveDc();
if (activeDc.equalsIgnoreCase(currentDc)) {
expectClusters.add(clusterId);
} else if (dcId.equalsIgnoreCase(currentDc) && !currentZone.equalsIgnoreCase(xpipeMeta.getDcs().get(activeDc).getZone())) {
addInstanceForPing = true;
} else {
continue;
}
} else if (clusterType.supportSingleActiveDC() || clusterType.equals(ClusterType.CROSS_DC)) {
String activeDc = clusterMeta.getActiveDc();
if (activeDc.equalsIgnoreCase(currentDc)) {
expectClusters.add(clusterId);
} else {
continue;
}
} else if (!expectClusters.contains(clusterId)) {
String[] dcs = clusterMeta.getDcs().split("\\s*,\\s*");
for (String dc : dcs) {
if (dc.equalsIgnoreCase(currentDc)) expectClusters.add(clusterId);
}

if (!expectClusters.contains(clusterId)) continue;
}

for (ShardMeta shardMeta: clusterMeta.getShards().values()) {
for (RedisMeta redisMeta: shardMeta.getRedises()) {
if (addInstanceForPing) {
expectPingInstances.add(new HostPort(redisMeta.getIp(), redisMeta.getPort()));
} else {
expectInstances.add(new HostPort(redisMeta.getIp(), redisMeta.getPort()));
}
}
}
}
}

boolean noMissing = true;
if (!currentClusters.equals(expectClusters)) {
noMissing = false;
logger.debug("[checkInstancesMiss][cluster][current] {}", currentClusters);
logger.debug("[checkInstancesMiss][cluster][expect] {}", expectClusters);
EventMonitor.DEFAULT.logEvent(ALERT_TYPE, "clusterMissing");
}
if (!currentInstances.equals(expectInstances)) {
noMissing = false;
logger.debug("[checkInstancesMiss][instance][current] {}", currentInstances);
logger.debug("[checkInstancesMiss][instance][expect] {}", expectInstances);
EventMonitor.DEFAULT.logEvent(ALERT_TYPE, "instanceMissing");
}
if (!currentPingInstances.equals(expectPingInstances)) {
noMissing = false;
logger.debug("[checkInstancesMiss][CrossRegionInstance][current] {}", currentPingInstances);
logger.debug("[checkInstancesMiss][CrossRegionInstance][expect] {}", expectPingInstances);
EventMonitor.DEFAULT.logEvent(ALERT_TYPE, "CrossRegionInstanceMissing");
}
if (noMissing) {
EventMonitor.DEFAULT.logEvent(ALERT_TYPE, "noMissing");
}

return noMissing;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.ctrip.xpipe.redis.core.meta.KeeperContainerDetailInfo;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.StringUtil;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -116,8 +117,13 @@ protected void doStop() throws Exception {
super.doStop();
}

@VisibleForTesting
protected void setInstanceManager(HealthCheckInstanceManager instanceManager) {
this.instanceManager = instanceManager;
}

void generateHealthCheckInstances() {
@VisibleForTesting
protected void generateHealthCheckInstances() {
XpipeMeta meta = metaCache.getXpipeMeta();

for(DcMeta dcMeta : meta.getDcs().values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void start() {
@Override
protected void doRun() throws Exception {
checkDcMetaChange();
instanceManager.checkInstancesMiss(metaCache.getXpipeMeta());
}
}, interval * 2, interval, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.factory.DefaultHealthCheckEndpointFactoryTest;
import com.ctrip.xpipe.redis.checker.healthcheck.factory.DefaultHealthCheckInstanceFactoryTest;
import com.ctrip.xpipe.redis.checker.healthcheck.factory.HealthCheckEndpointFactoryTest;
import com.ctrip.xpipe.redis.checker.healthcheck.impl.DefaultHealthCheckInstanceManagerTest;
import com.ctrip.xpipe.redis.checker.healthcheck.impl.DefaultHealthCheckerMockTest;
import com.ctrip.xpipe.redis.checker.healthcheck.impl.DefaultRouteChooserTest;
import com.ctrip.xpipe.redis.checker.healthcheck.meta.DefaultDcMetaChangeManagerTest;
Expand Down Expand Up @@ -217,6 +218,8 @@

DefaultHealthCheckConfigTest.class,
DefaultRouteChooserTest.class,

DefaultHealthCheckInstanceManagerTest.class
})
public class AllTests {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.ctrip.xpipe.redis.checker.healthcheck.impl;

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.redis.checker.AbstractCheckerTest;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.KeeperHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.redis.core.meta.CurrentDcAllMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Collections;

/**
* @author lishanglin
* date 2024/9/14
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class DefaultHealthCheckInstanceManagerTest extends AbstractCheckerTest {

@InjectMocks
private DefaultHealthChecker healthChecker;

@InjectMocks
private DefaultHealthCheckInstanceManager healthCheckInstanceManager;

@Mock
private MetaCache metaCache;

@Mock
private CheckerConfig checkerConfig;

@Mock
private CurrentDcAllMeta currentDcAllMeta;

@Mock
private HealthCheckInstanceFactory instanceFactory;

@Mock
private RedisHealthCheckInstance mockCheckInstance;

@Mock
private KeeperHealthCheckInstance mockKeeperInstance;

@Mock
private ClusterHealthCheckInstance mockClusterInstance;

@Before
public void setupDefaultHealthCheckInstanceManagerTest() {
Mockito.when(instanceFactory.create(Mockito.any(RedisMeta.class))).thenReturn(mockCheckInstance);
Mockito.when(instanceFactory.create(Mockito.any(KeeperMeta.class))).thenReturn(mockKeeperInstance);
Mockito.when(instanceFactory.create(Mockito.any(ClusterMeta.class))).thenReturn(mockClusterInstance);
Mockito.when(instanceFactory.createRedisInstanceForAssignedAction(Mockito.any())).thenReturn(mockCheckInstance);
Mockito.when(instanceFactory.getOrCreateRedisInstanceForPsubPingAction(Mockito.any())).thenReturn(mockCheckInstance);

Mockito.when(checkerConfig.getIgnoredHealthCheckDc()).thenReturn(Collections.emptySet());

Mockito.when(currentDcAllMeta.getCurrentDcAllMeta()).thenReturn(getXpipeMeta().findDc(FoundationService.DEFAULT.getDataCenter()));

Mockito.when(metaCache.getXpipeMeta()).thenReturn(getXpipeMeta());
Mockito.doAnswer(inv -> {
String currentDc = inv.getArgument(0, String.class);
String otherDc = inv.getArgument(1, String.class);
DcMeta currentDcMeta = getXpipeMeta().findDc(currentDc);
DcMeta otherDcMeta = getXpipeMeta().findDc(otherDc);
if (null == currentDcMeta || null == otherDcMeta) return false;
return !currentDcMeta.getZone().equalsIgnoreCase(otherDcMeta.getZone());
}).when(metaCache).isCrossRegion(Mockito.anyString(), Mockito.anyString());

healthChecker.setInstanceManager(healthCheckInstanceManager);
}

@Test
public void testInstanceMatch() {
healthChecker.generateHealthCheckInstances();
Assert.assertTrue(healthCheckInstanceManager.checkInstancesMiss(getXpipeMeta()));
}

@Test
public void testInstanceDisMatch() {
healthChecker.generateHealthCheckInstances();
XpipeMeta xpipeMeta = getXpipeMeta();
xpipeMeta.getDcs().get("jq").getClusters().remove("bbz_qmq_idempotent_fra_default");
Assert.assertFalse(healthCheckInstanceManager.checkInstancesMiss(xpipeMeta));
}

@Override
protected String getXpipeMetaConfigFile() {
return "multi-type-health-instances.xml";
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?xml version='1.0' encoding='utf-8' ?>
<xpipe>
<dc id="fra-aws" zone="fra">
<cluster id="BBZ_artemis_registry_sgp_ali" db-id="16056" active-dc="fra-aws" backup-dcs="" downstream-dcs="" dc-group-name="fra-aws" org-id="18" type="SINGLE_DC" admin-emails="[email protected]" clusterDesignatedRouteIds="">
<shard id="BBZ_artemis_registry_sgp_ali_1" db-id="117472">
<redis id="unknown" ip="10.147.78.75" port="6379" master=""/>
<redis id="unknown" ip="10.147.138.206" port="6379" master="0.0.0.0:0"/>
</shard>
</cluster>
<cluster id="IBU_quic_cache" db-id="5809" active-dc="jq" backup-dcs="fra-aws,oy" downstream-dcs="" dc-group-type="DR_MASTER" dc-group-name="fra-aws" org-id="29" type="ONE_WAY" admin-emails="[email protected]" clusterDesignatedRouteIds="">
<shard id="IBU_quic_cache_v202110151745_1" db-id="42285">
<keeper id="218d0a87364fe550c0c4cbc0c488c0c5e9eee68d" ip="10.147.241.56" port="6523" active="false" keeperContainerId="1697"/>
<keeper id="218d0a87364fe550c0c4cbc0c488c0c5e9eee68d" ip="10.147.242.3" port="6512" active="false" keeperContainerId="1698"/>
<redis id="unknown" ip="10.147.131.172" port="6379" master="0.0.0.0:0"/>
<redis id="unknown" ip="10.147.84.69" port="6379" master="0.0.0.0:0"/>
</shard>
</cluster>
<cluster id="bbz_qmq_idempotent_fra_default" db-id="11496" active-dc="fra-aws" backup-dcs="jq,oy" downstream-dcs="" dc-group-type="DR_MASTER" dc-group-name="fra-aws" org-id="18" type="ONE_WAY" admin-emails="[email protected]" clusterDesignatedRouteIds="">
<shard id="bbz_qmq_idempotent_fra_default_1" db-id="92519">
<keeper id="8c57a9ca5608d7791714f765a97327e0393e6bae" ip="10.147.242.71" port="6401" active="false" keeperContainerId="1247"/>
<keeper id="8c57a9ca5608d7791714f765a97327e0393e6bae" ip="10.147.241.157" port="6401" active="false" keeperContainerId="1249"/>
<redis id="unknown" ip="10.147.85.104" port="6379" master="0.0.0.0:0"/>
<redis id="unknown" ip="10.147.208.149" port="6379" master="0.0.0.0:0"/>
</shard>
</cluster>
<cluster id="BBZ_ares_meta_datum" db-id="8186" downstream-dcs="" dc-group-name="fra-aws" org-id="18" type="LOCAL_DC" admin-emails="[email protected]" dcs="jq,oy,fra-aws" clusterDesignatedRouteIds="">
<shard id="BBZ_ares_meta_datum_1" db-id="62096">
<redis id="unknown" ip="10.147.85.79" port="6379" master="0.0.0.0:0"/>
<redis id="unknown" ip="10.147.139.139" port="6379" master=""/>
</shard>
</cluster>
<cluster id="RSK_cross_site_cache" db-id="13723" downstream-dcs="" dc-group-name="fra-aws" org-id="28" type="BI_DIRECTION" admin-emails="[email protected]" dcs="fra-aws,jq,oy" activeRedisCheckRules="1" clusterDesignatedRouteIds="">
<shard id="RSK_cross_site_cache_2" db-id="119342">
<redis id="unknown" ip="10.147.89.249" port="6379" master=""/>
<redis id="unknown" ip="10.147.143.156" port="6379" master="0.0.0.0:0"/>
</shard>
<shard id="RSK_cross_site_cache_1" db-id="119343" sentinelId="420">
<redis id="unknown" ip="10.147.143.171" port="6379" master=""/>
<redis id="unknown" ip="10.147.89.243" port="6379" master="0.0.0.0:0"/>
</shard>
</cluster>
</dc>
<dc id="jq" zone="jq">
<cluster id="bbz_qmq_idempotent_fra_default" db-id="11496" active-dc="fra-aws" backup-dcs="jq,oy" downstream-dcs="" dc-group-type="DR_MASTER" dc-group-name="jq" org-id="18" type="ONE_WAY" admin-emails="[email protected]" clusterDesignatedRouteIds="">
<shard id="bbz_qmq_idempotent_fra_default_1" db-id="92519">
<keeper id="e86f24749b8bc7a916e23f2896095dc3f216445d" ip="10.61.38.73" port="6471" active="false" keeperContainerId="1408"/>
<keeper id="e86f24749b8bc7a916e23f2896095dc3f216445d" ip="10.61.38.72" port="6488" active="false" keeperContainerId="1409"/>
<redis id="unknown" ip="10.43.49.173" port="6379" master="0.0.0.0:0"/>
<redis id="unknown" ip="10.56.204.175" port="6379" master="0.0.0.0:0"/>
</shard>
</cluster>
<cluster id="IBU_quic_cache" db-id="5809" active-dc="jq" backup-dcs="fra-aws,oy" downstream-dcs="" dc-group-type="DR_MASTER" dc-group-name="jq" org-id="29" type="ONE_WAY" admin-emails="[email protected]" clusterDesignatedRouteIds="">
<shard id="IBU_quic_cache_v202110151745_1" db-id="42285">
<keeper id="905866aaa8ecf991b49c756c7020f969fc6f2d0b" ip="10.60.6.123" port="6788" active="false" keeperContainerId="1403"/>
<keeper id="905866aaa8ecf991b49c756c7020f969fc6f2d0b" ip="10.61.4.87" port="6480" active="false" keeperContainerId="1658"/>
<redis id="unknown" ip="10.97.136.93" port="6379" master=""/>
<redis id="unknown" ip="10.141.145.48" port="6379" master="0.0.0.0:0"/>
</shard>
</cluster>
<cluster id="RSK_cross_site_cache" db-id="13723" downstream-dcs="" dc-group-name="jq" org-id="28" type="BI_DIRECTION" admin-emails="[email protected]" dcs="fra-aws,jq,oy" activeRedisCheckRules="1" clusterDesignatedRouteIds="">
<shard id="RSK_cross_site_cache_2" db-id="119342" sentinelId="390">
<redis id="unknown" ip="10.61.173.42" port="6379" master="0.0.0.0:0"/>
<redis id="unknown" ip="10.62.68.145" port="6379" master=""/>
</shard>
<shard id="RSK_cross_site_cache_1" db-id="119343" sentinelId="390">
<redis id="unknown" ip="10.58.56.107" port="6379" master="0.0.0.0:0"/>
<redis id="unknown" ip="10.62.114.205" port="6379" master=""/>
</shard>
</cluster>
<cluster id="BBZ_ares_meta_datum" db-id="8186" downstream-dcs="" dc-group-name="jq" org-id="18" type="LOCAL_DC" admin-emails="[email protected]" dcs="jq,oy,fra-aws" clusterDesignatedRouteIds="">
<shard id="BBZ_ares_meta_datum_1" db-id="62096">
<redis id="unknown" ip="10.43.181.159" port="6379" master="0.0.0.0:0"/>
<redis id="unknown" ip="10.43.182.103" port="6379" master=""/>
</shard>
</cluster>
<cluster id="FLT_Trine_Draft_Queues" db-id="11045" active-dc="jq" backup-dcs="" downstream-dcs="" dc-group-name="jq" org-id="18" type="SINGLE_DC" admin-emails="[email protected]" clusterDesignatedRouteIds="">
<shard id="FLT_Trine_Draft_Queues_v20230713_1" db-id="88649">
<redis id="unknown" ip="10.58.70.115" port="6379" master="0.0.0.0:0"/>
<redis id="unknown" ip="10.96.94.245" port="6379" master=""/>
</shard>
</cluster>
<cluster id="j_kang_redis3" db-id="14807" active-dc="jq" downstream-dcs="" dc-group-name="jq" org-id="2" type="CROSS_DC" admin-emails="[email protected]" dcs="jq" clusterDesignatedRouteIds="">
<shard id="j_kang_redis3_1" db-id="111300" sentinelId="138">
<redis id="unknown" ip="10.58.107.181" port="6379" master=""/>
<redis id="unknown" ip="10.58.91.145" port="6379" master="0.0.0.0:0"/>
</shard>
</cluster>
</dc>
</xpipe>
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,14 @@ protected List<ALERT_TYPE> alertTypes() {
return Collections.emptyList();
}

@Override
protected long getIntervalMilli() {
return Math.max(60000, super.getIntervalMilli());
}

@Override
protected boolean shouldDoAction() {
return config.autoSetKeeperSyncLimit();
return config.autoSetKeeperSyncLimit() && super.shouldDoAction();
}

}

0 comments on commit b42baac

Please sign in to comment.