Skip to content

Commit

Permalink
跨region拉入拉出检测优化
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Aug 12, 2024
1 parent 5032c49 commit 866276e
Show file tree
Hide file tree
Showing 32 changed files with 951 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ public String getHealthCheckRedisInstanceForAssignedAction(@PathVariable String
return Codec.DEFAULT.encode(model);
}

@RequestMapping(value = "/health/check/redis-for-ping-action/{ip}/{port}", method = RequestMethod.GET)
public String getHealthCheckRedisInstanceForPingAction(@PathVariable String ip, @PathVariable int port) {
RedisHealthCheckInstance instance = instanceManager.findRedisInstanceForPingAction(new HostPort(ip, port));
if(instance == null) {
return "Not found";
}
HealthCheckInstanceModel model = buildHealthCheckInfo(instance);
return Codec.DEFAULT.encode(model);
}

@RequestMapping(value = "/health/redis/info/{ip}/{port}", method = RequestMethod.GET)
public ActionContextRetMessage<Map<String, String>> getRedisInfo(@PathVariable String ip, @PathVariable int port) {
return ActionContextRetMessage.from(redisInfoManager.getInfoByHostPort(new HostPort(ip, port)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public interface HealthCheckInstanceManager {

RedisHealthCheckInstance getOrCreateRedisInstanceForAssignedAction(RedisMeta redis);

RedisHealthCheckInstance getOrCreateRedisInstanceForPsubPingAction(RedisMeta redis);

KeeperHealthCheckInstance getOrCreate(KeeperMeta keeper);

ClusterHealthCheckInstance getOrCreate(ClusterMeta cluster);
Expand All @@ -26,6 +28,8 @@ public interface HealthCheckInstanceManager {

RedisHealthCheckInstance findRedisInstanceForAssignedAction(HostPort hostPort);

RedisHealthCheckInstance findRedisInstanceForPingAction(HostPort hostPort);

KeeperHealthCheckInstance findKeeperHealthCheckInstance(HostPort hostPort);

ClusterHealthCheckInstance findClusterHealthCheckInstance(String clusterId);
Expand All @@ -36,6 +40,8 @@ public interface HealthCheckInstanceManager {

RedisHealthCheckInstance removeRedisInstanceForAssignedAction(HostPort hostPort);

RedisHealthCheckInstance removeRedisInstanceForPingAction(HostPort hostPort);

ClusterHealthCheckInstance remove(String cluster);

List<RedisHealthCheckInstance> getAllRedisInstance();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction;

import com.ctrip.xpipe.redis.checker.healthcheck.ActionContext;
import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckAction;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionContext;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingActionListener;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionContext;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubActionListener;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector;
import com.google.common.collect.Maps;

import java.util.Map;

public abstract class AbstractPsubPingActionCollector implements PsubPingActionCollector {

protected Map<RedisHealthCheckInstance, HealthStatus> allHealthStatus = Maps.newConcurrentMap();

protected PingActionListener pingActionListener = new AbstractPsubPingActionCollector.CollectorPingActionListener();

protected PsubActionListener psubActionListener = new AbstractPsubPingActionCollector.CollectorPsubActionListener();

protected abstract HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance);

protected void removeHealthStatus(HealthCheckAction<RedisHealthCheckInstance> action) {
HealthStatus healthStatus = allHealthStatus.remove(action.getActionInstance());
if(healthStatus != null) {
healthStatus.stop();
}
}

@Override
public boolean supportInstance(RedisHealthCheckInstance instance) {
return true;
}

@Override
public PingActionListener createPingActionListener() {
return pingActionListener;
}

@Override
public PsubActionListener createPsubActionListener() {
return psubActionListener;
}

protected class CollectorPingActionListener implements PingActionListener {

@Override
public void onAction(PingActionContext pingActionContext) {
HealthStatus healthStatus = createOrGetHealthStatus(pingActionContext.instance());
if (!pingActionContext.isSuccess()) {
if (pingActionContext.getCause().getMessage().contains("LOADING")) {
healthStatus.loading();
}
return;
}

if (pingActionContext.getResult()) {
healthStatus.pong();
} else {
if(healthStatus.getState() == HEALTH_STATE.UNKNOWN) {
healthStatus.pongInit();
}
}
}

@Override
public boolean worksfor(ActionContext t) {
return t instanceof PingActionContext;
}

@Override
public void stopWatch(HealthCheckAction action) {
removeHealthStatus(action);
}
}

protected class CollectorPsubActionListener implements PsubActionListener {

@Override
public void onAction(PsubActionContext psubActionContext) {
HealthStatus healthStatus = createOrGetHealthStatus(psubActionContext.instance());
if (!psubActionContext.getResult().isEmpty()) {
healthStatus.subSuccess();
}
}

@Override
public void stopWatch(HealthCheckAction<RedisHealthCheckInstance> action) {
removeHealthStatus(action);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction;

import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceDown;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.InstanceUp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledExecutorService;

/**
* UNKNOWN
* pingSuccess -> INSTANCEUP + start subAction
* pingFail -> DOWN + markDown
* subSuccess -> throw exception
* <p>
* INSTANCEUP
* pingSuccess,do nothing
* pingFail -> DOWN + markDown
* subSuccess -> HEALTHY + markUp + stop subAction
* <p>
* HEALTHY
* pingSuccess,do nothing
* pingFail -> DOWN + markDown
* subSuccess -> throw exception
* <p>
* DOWN
* pingSuccess -> INSTANCEUP + start subAction
* pingFail,do nothing
* subSuccess -> throw exception
*/
public class CrossRegionRedisHealthStatus extends HealthStatus {

protected static final Logger logger = LoggerFactory.getLogger(CrossRegionRedisHealthStatus.class);

public CrossRegionRedisHealthStatus(RedisHealthCheckInstance instance, ScheduledExecutorService scheduled) {
super(instance, scheduled);
}

@Override
protected void loading() {
doMarkDown();
}

@Override
protected void pong() {
HEALTH_STATE preState = state.get();
if (preState.equals(HEALTH_STATE.UNKNOWN) || preState.equals(HEALTH_STATE.DOWN)) {
if(state.compareAndSet(preState, HEALTH_STATE.INSTANCEUP)) {
logStateChange(preState, state.get());
}
}
}

@Override
protected void subSuccess() {
HEALTH_STATE preState = state.get();
if (preState.equals(HEALTH_STATE.INSTANCEUP)) {
if(state.compareAndSet(preState, HEALTH_STATE.HEALTHY)) {
logStateChange(preState, state.get());
}
logger.info("[setUp] {}", this);
notifyObservers(new InstanceUp(instance));
}
}

@Override
protected void healthStatusUpdate() {
long currentTime = System.currentTimeMillis();

if(lastPongTime.get() != UNSET_TIME) {
long pingDownTime = currentTime - lastPongTime.get();
final int pingDownAfter = pingDownAfterMilli.getAsInt();
if (pingDownTime > pingDownAfter) {
doMarkDown();
}
}
}

protected void doMarkDown() {
HEALTH_STATE preState = state.get();
if(state.compareAndSet(preState, HEALTH_STATE.DOWN)) {
logStateChange(preState, state.get());
}
if (!preState.equals(HEALTH_STATE.DOWN)) {
logger.info("[setDown] {}", this);
notifyObservers(new InstanceDown(instance));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction;

import com.ctrip.xpipe.api.factory.ObjectFactory;
import com.ctrip.xpipe.api.observer.Observable;
import com.ctrip.xpipe.api.observer.Observer;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.RedisInstanceInfo;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.event.AbstractInstanceEvent;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.interaction.processor.HealthEventProcessor;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.psubscribe.PsubPingActionCollector;
import com.ctrip.xpipe.utils.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;

@Component
public class DefaultPsubPingActionCollector extends AbstractPsubPingActionCollector implements PsubPingActionCollector, HealthStateService, OneWaySupport {

private static final Logger logger = LoggerFactory.getLogger(DefaultPsubPingActionCollector.class);

@Autowired
private List<HealthEventProcessor> healthEventProcessors;

@Resource(name = SCHEDULED_EXECUTOR)
private ScheduledExecutorService scheduled;

@Resource(name = GLOBAL_EXECUTOR)
private ExecutorService executors;

@Override
public HEALTH_STATE getHealthState(HostPort hostPort) {
RedisHealthCheckInstance key = allHealthStatus.keySet().stream()
.filter(instance -> instance.getCheckInfo().getHostPort().equals(hostPort))
.findFirst().orElse(null);

if (null != key) return allHealthStatus.get(key).getState();
return null;
}

@Override
public Map<HostPort, HEALTH_STATE> getAllCachedState() {
Map<HostPort, HEALTH_STATE> cachedHealthStatus = new HashMap<>();
allHealthStatus.forEach(((instance, healthStatus) -> {
RedisInstanceInfo info = instance.getCheckInfo();
cachedHealthStatus.put(info.getHostPort(), healthStatus.getState());
}));

return cachedHealthStatus;
}

@Override
public void updateHealthState(Map<HostPort, HEALTH_STATE> redisStates) {
throw new UnsupportedOperationException();
}

@Override
protected HealthStatus createOrGetHealthStatus(RedisHealthCheckInstance instance) {
return MapUtils.getOrCreate(allHealthStatus, instance, new ObjectFactory<HealthStatus>() {
@Override
public HealthStatus create() {

HealthStatus healthStatus = new CrossRegionRedisHealthStatus(instance, scheduled);

healthStatus.addObserver(new Observer() {
@Override
public void update(Object args, Observable observable) {
onInstanceStateChange(args);
}
});
healthStatus.start();
return healthStatus;
}
});
}

private void onInstanceStateChange(Object args) {

logger.info("[onInstanceStateChange]{}", args);
for (HealthEventProcessor processor : healthEventProcessors) {

if (processor instanceof OneWaySupport) {
executors.execute(new AbstractExceptionLogTask() {
@Override
protected void doRun() throws Exception {
processor.onEvent((AbstractInstanceEvent) args);
}
});
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public class HealthStatus extends AbstractObservable implements Startable, Stopp

public static long UNSET_TIME = -1L;

private AtomicLong lastPongTime = new AtomicLong(UNSET_TIME);
protected AtomicLong lastPongTime = new AtomicLong(UNSET_TIME);
private AtomicLong lastHealthDelayTime = new AtomicLong(UNSET_TIME);

private AtomicReference<HEALTH_STATE> state = new AtomicReference<>(HEALTH_STATE.UNKNOWN);
protected AtomicReference<HEALTH_STATE> state = new AtomicReference<>(HEALTH_STATE.UNKNOWN);

protected RedisHealthCheckInstance instance;
protected final IntSupplier delayDownAfterMilli;
Expand Down Expand Up @@ -110,7 +110,7 @@ protected boolean shouldNotRun() {
return lastHealthDelayTime.get() < 0 && lastPongTime.get() < 0;
}

void loading() {
protected void loading() {
HEALTH_STATE preState = state.get();
if(preState.equals(preState.afterPingFail())) {
return;
Expand All @@ -124,7 +124,7 @@ void loading() {
}
}

void pong(){
protected void pong(){
lastPongTime.set(System.currentTimeMillis());
setPingUp();
}
Expand All @@ -135,6 +135,8 @@ void pongInit() {
}
}

protected void subSuccess(){}

void delay(long delayMilli, long...srcShardDbId){

//first time
Expand Down
Loading

0 comments on commit 866276e

Please sign in to comment.