Skip to content

Commit

Permalink
beacon checker
Browse files Browse the repository at this point in the history
  • Loading branch information
qifanwang committed Nov 5, 2024
1 parent 97e9741 commit 548d41a
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -27,6 +28,12 @@ public interface MonitorService {

void registerCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void unregisterCluster(String system, String clusterName);

int getBeaconClusterHash(String system, String clusterName);

Map<String,Set<String>> getAllClusterWithDc(String system);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,13 @@ public void setExtra(Map<String, String> extra) {
this.extra = extra;
}

@Override
public int hashCode() {
int hash = 0;
for (MonitorGroupMeta meta : nodeGroups) {
hash ^= meta.hashCode();
}
return hash;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ public boolean equals(Object o) {
MonitorGroupMeta that = (MonitorGroupMeta) o;
return Objects.equals(name, that.name) &&
Objects.equals(idc, that.idc) &&
Objects.equals(nodes, that.nodes);
Objects.equals(nodes, that.nodes) &&
Objects.equals(masterGroup, that.masterGroup);
}

@Override
public int hashCode() {
return Objects.hash(name, idc, nodes);
return Objects.hash(name, idc, nodes, masterGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -57,9 +58,24 @@ public void registerCluster(String system, String clusterName, Set<MonitorGroupM
// do nothing
}

@Override
public void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups) {

}

@Override
public void unregisterCluster(String system, String clusterName) {
// do nothing
}

@Override
public int getBeaconClusterHash(String system, String clusterName) {
return 0;
}

@Override
public Map<String, Set<String>> getAllClusterWithDc(String system) {
return Collections.emptyMap();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.ctrip.xpipe.api.migration;

import com.ctrip.xpipe.api.migration.auto.data.MonitorClusterMeta;
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;
import com.ctrip.xpipe.endpoint.HostPort;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;

import java.util.Set;

public class MonitorClusterMetaTest {

@Test
public void testMonitorClusterMeta(){
Set<MonitorGroupMeta> group1 = getMonitorGroupMeta1();
MonitorClusterMeta monitorClusterMeta1 = new MonitorClusterMeta(group1);

Set<MonitorGroupMeta> group2 = getMonitorGroupMeta2();
MonitorClusterMeta monitorClusterMeta2 = new MonitorClusterMeta(group2);

Set<MonitorGroupMeta> group3 = getMonitorGroupMeta3();
MonitorClusterMeta monitorClusterMeta3 = new MonitorClusterMeta(group3);

Set<MonitorGroupMeta> group4 = getMonitorGroupMeta4();
MonitorClusterMeta monitorClusterMeta4 = new MonitorClusterMeta(group4);

Assert.assertEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta2.hashCode());
Assert.assertNotEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta3.hashCode());
Assert.assertNotEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta4.hashCode());
System.out.println(monitorClusterMeta1.hashCode());
System.out.println(monitorClusterMeta2.hashCode());
System.out.println(monitorClusterMeta3.hashCode());
System.out.println(monitorClusterMeta4.hashCode());
}


private Set<MonitorGroupMeta> getMonitorGroupMeta1() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta2() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta3() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), false),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), false),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), true)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta4() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ public interface BeaconManager {

void registerCluster(String clusterId, ClusterType clusterType, int orgId);

void updateCluster(String clusterId, ClusterType clusterType, int orgId);

boolean checkClusterHash(String clusterId, ClusterType clusterType, int orgId);

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.metric.MetricData;
import com.ctrip.xpipe.metric.MetricProxy;
import com.ctrip.xpipe.metric.MetricProxyException;
import com.ctrip.xpipe.redis.checker.BeaconManager;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterHealthCheckInstance;

import com.ctrip.xpipe.redis.checker.healthcheck.ClusterInstanceInfo;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.AbstractLeaderAwareHealthCheckAction;
import com.ctrip.xpipe.utils.ServicesUtil;
import org.slf4j.Logger;

import java.util.concurrent.ExecutorService;
Expand All @@ -18,6 +23,10 @@ public class BeaconMetaCheckAction extends AbstractLeaderAwareHealthCheckAction<

private BeaconManager beaconManager;

private MetricProxy metricProxy = ServicesUtil.getMetricProxy();

private static long lastSendTime = System.currentTimeMillis();

public BeaconMetaCheckAction(ScheduledExecutorService scheduled, ClusterHealthCheckInstance instance, ExecutorService executors,
BeaconManager beaconManager) {
super(scheduled, instance, executors);
Expand All @@ -31,6 +40,7 @@ protected void doTask() {
int orgId = info.getOrgId();

beaconManager.registerCluster(clusterId, info.getClusterType(), orgId);
checkConsistency(clusterId, info.getClusterType(), orgId);
}

@Override
Expand All @@ -43,4 +53,43 @@ protected int getBaseCheckInterval() {
return getActionInstance().getHealthCheckConfig().clusterCheckIntervalMilli();
}

private void checkConsistency(String clusterId, ClusterType clusterType, int orgId) {
boolean consistency = false;
try {
consistency = beaconManager.checkClusterHash(clusterId, clusterType, orgId);
} catch (Throwable t) {
// cluster not found in beacon
logger.error("[checkConsistency]" + clusterType + ":" + orgId + ":" + t.getMessage());
beaconManager.registerCluster(clusterId, clusterType, orgId);
return;
}
String trackCluster = clusterId;
if (consistency) {
trackCluster = null;
} else {
beaconManager.updateCluster(clusterId, clusterType, orgId);
}
MetricData metricData = getMetricData(trackCluster, consistency);
sendMetricData(metricData, consistency);
}

private MetricData getMetricData(String clusterId, boolean consistency) {
MetricData metricData = new MetricData("beacon_checker", null, clusterId, null);
metricData.addTag("consistency", String.valueOf(consistency));
return metricData;
}

private void sendMetricData(MetricData metricData, boolean consistency) {
long sendTime = System.currentTimeMillis();
if(consistency && sendTime - lastSendTime < getBaseCheckInterval()) {
return;
}
try {
metricProxy.writeBinMultiDataPoint(metricData);
lastSendTime = sendTime;
} catch (MetricProxyException e) {
logger.error("[sendMetricData]", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,14 @@ public class TestBeaconManager implements BeaconManager {
public void registerCluster(String clusterId, ClusterType clusterType, int orgId) {

}

@Override
public void updateCluster(String clusterId, ClusterType clusterType, int orgId) {

}

@Override
public boolean checkClusterHash(String clusterId, ClusterType clusterType, int orgId) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class AbstractIntervalAction {

@PostConstruct
public void postConstruct(){
logger.info("[postConstruct]{}", this);
logger.info("[postConstruct] {}", this);

for(ALERT_TYPE type : alertTypes()) {
alertPolicyManager.markCheckInterval(type, this::getIntervalMilli);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.ctrip.xpipe.api.migration.auto.MonitorService;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.metric.MetricData;
import com.ctrip.xpipe.metric.MetricProxy;
import com.ctrip.xpipe.metric.MetricProxyException;
import com.ctrip.xpipe.redis.checker.alert.ALERT_TYPE;
import com.ctrip.xpipe.redis.checker.alert.AlertManager;
import com.ctrip.xpipe.redis.console.AbstractCrossDcIntervalAction;
Expand All @@ -12,6 +15,7 @@
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.ServicesUtil;
import com.ctrip.xpipe.utils.StringUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -40,6 +44,8 @@ public class BeaconClusterMonitorCheck extends AbstractCrossDcIntervalAction {
@Autowired
private ConsoleCommonConfig config;

private MetricProxy metricProxy = ServicesUtil.getMetricProxy();

@Override
protected List<ALERT_TYPE> alertTypes() {
return Collections.singletonList(TOO_MANY_CLUSTERS_EXCLUDE_FROM_BEACON);
Expand Down Expand Up @@ -75,6 +81,39 @@ public void doAction() {
});
});
}));

new BeaconConsistencyCheckJob(clustersByBeaconSystemOrg, services, metaCache)
.execute()
.addListener(commandFuture -> {
if (commandFuture.isSuccess()) {
Set<String> cluster = commandFuture.get();
if(cluster == null || cluster.size() == 0){
MetricData metricData = getMetricData(null, true);
sendMetricData(metricData);
} else {
for(String clusterName : cluster){
MetricData metricData = getMetricData(clusterName, false);
sendMetricData(metricData);
}
}
} else {
logger.error("BeaconConsistencyCheck,fail");
}
});
}

private MetricData getMetricData(String clusterId, boolean consistency) {
MetricData metricData = new MetricData("beacon_console", null, clusterId, null);
metricData.addTag("consistency", String.valueOf(consistency));
return metricData;
}

private void sendMetricData(MetricData metricData) {
try {
metricProxy.writeBinMultiDataPoint(metricData);
} catch (MetricProxyException e) {
logger.error("[sendMetricData]", e);
}
}

private Map<BeaconSystem, Map<Long, Set<String>>> separateClustersByBeaconSystemOrg(Set<Long> orgIds) {
Expand Down
Loading

0 comments on commit 548d41a

Please sign in to comment.