Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reset keeper-election on container unhealthy #764

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
/**
*
*/
package com.ctrip.xpipe.api.cluster;

import com.ctrip.xpipe.api.lifecycle.Lifecycle;
Expand All @@ -14,4 +11,6 @@ public interface LeaderElector extends Lifecycle {

void elect() throws Exception;

boolean hasLeaderShip();

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public void isLeader() {
logger.info("[elect]{}", ctx);
}

@Override
public boolean hasLeaderShip() {
return null != latch && latch.hasLeadership();
}

@Override
public void doStop() throws Exception {
if (latch != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.protocal.PsyncObserver;
import com.ctrip.xpipe.redis.core.store.ClusterId;
import com.ctrip.xpipe.redis.core.store.ReplId;
import com.ctrip.xpipe.redis.core.store.ReplicationStore;
import com.ctrip.xpipe.redis.core.store.ShardId;
import com.ctrip.xpipe.redis.keeper.config.KeeperConfig;
import com.ctrip.xpipe.redis.keeper.exception.RedisSlavePromotionException;
import com.ctrip.xpipe.redis.keeper.impl.SetRdbDumperException;
Expand Down Expand Up @@ -98,4 +96,8 @@ public static enum PROMOTION_STATE{

void resetElection();

boolean isLeader();

long getLastElectionResetTime();

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
public class DefaultKeeperContainerConfig implements KeeperContainerConfig {
public static final String REPLICATION_STORE_DIR = "replication.store.dir";
public static final String DISK_CHECK_INTERVAL_MILL = "disk.check.interval.mill";
public static final String CHECK_ROUND = "health.check.round.before.mark.down";
public static final String ELECTION_RESET_INTERVAL = "election.reset.interval.min";
private Config config;

@PostConstruct
Expand All @@ -27,6 +29,16 @@
return Integer.parseInt(config.get(DISK_CHECK_INTERVAL_MILL, "30000"));
}

@Override
public int checkRoundBeforeMarkDown() {
return Integer.parseInt(config.get(CHECK_ROUND, "3"));

Check warning on line 34 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperContainerConfig.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperContainerConfig.java#L34

Added line #L34 was not covered by tests
}

@Override
public int keeperLeaderResetMinInterval() {
return Integer.parseInt(config.get(ELECTION_RESET_INTERVAL, "600"));

Check warning on line 39 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperContainerConfig.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperContainerConfig.java#L39

Added line #L39 was not covered by tests
}

private String getDefaultRdsDir() {
return System.getProperty("user.dir");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ public interface KeeperContainerConfig {

int diskCheckInterval();

int checkRoundBeforeMarkDown();

int keeperLeaderResetMinInterval();

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@
return 30000;
}

@Override
public int checkRoundBeforeMarkDown() {
return 3;

Check warning on line 29 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperContainerConfig.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperContainerConfig.java#L29

Added line #L29 was not covered by tests
}

@Override
public int keeperLeaderResetMinInterval() {
return 10;

Check warning on line 34 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperContainerConfig.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperContainerConfig.java#L34

Added line #L34 was not covered by tests
}

public void setMetaServerUrl(String metaServerUrl) {
this.metaServerUrl = metaServerUrl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@


import com.ctrip.xpipe.api.cluster.LeaderElectorManager;
import com.ctrip.xpipe.api.lifecycle.TopElement;
import com.ctrip.xpipe.api.observer.Observable;
import com.ctrip.xpipe.api.observer.Observer;
import com.ctrip.xpipe.exception.ErrorMessage;
import com.ctrip.xpipe.lifecycle.AbstractLifecycle;
import com.ctrip.xpipe.lifecycle.LifecycleHelper;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.redis.core.keeper.container.KeeperContainerErrorCode;
Expand All @@ -14,8 +18,10 @@
import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager;
import com.ctrip.xpipe.redis.keeper.exception.RedisKeeperRuntimeException;
import com.ctrip.xpipe.redis.keeper.health.DiskHealthChecker;
import com.ctrip.xpipe.redis.keeper.health.HealthState;
import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer;
import com.ctrip.xpipe.redis.keeper.monitor.KeepersMonitorManager;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
Expand All @@ -27,12 +33,13 @@
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* @author Jason Song([email protected])
*/
@Service
public class KeeperContainerService {
public class KeeperContainerService extends AbstractLifecycle implements TopElement, Observer {

@Autowired
KeeperConfig keeperConfig;
Expand All @@ -55,6 +62,32 @@

private Logger logger = LoggerFactory.getLogger(KeeperContainerService.class);

@Override
protected void doInitialize() throws Exception {
this.diskHealthChecker.addObserver(this);
}

Check warning on line 68 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java#L67-L68

Added lines #L67 - L68 were not covered by tests

@Override
public void update(Object args, Observable observable) {
if (args instanceof HealthState && !((HealthState) args).isUp()) {
long currentTime = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
// reset leader
for (RedisKeeperServer keeper: redisKeeperServers.values()) {
if (keeper.isLeader()) {
long lastResetTime = keeper.getLastElectionResetTime();
if (lastResetTime <= currentTime
&& currentTime - lastResetTime < keeperContainerConfig.keeperLeaderResetMinInterval()) {
logger.debug("[container_unhealthy][{}] last reset at: {}, skip", keeper.getReplId(), lastResetTime);
continue;
}

logger.info("[container_unhealthy][{}] reset leader", keeper.getReplId());
keeper.resetElection();
}
}
}
}

public RedisKeeperServer add(KeeperTransMeta keeperTransMeta) {
KeeperMeta keeperMeta = keeperTransMeta.getKeeperMeta();
enrichKeeperMetaFromKeeperTransMeta(keeperMeta, keeperTransMeta);
Expand Down Expand Up @@ -280,4 +313,9 @@
return ReplId.from(keeperTransMeta.getReplId()).toString();
}

@VisibleForTesting
protected void setRedisKeeperServers(Map<String, RedisKeeperServer> servers) {
this.redisKeeperServers = servers;
}

}
Original file line number Diff line number Diff line change
@@ -1,42 +1,48 @@
package com.ctrip.xpipe.redis.keeper.health;

import com.ctrip.xpipe.api.lifecycle.TopElement;
import com.ctrip.xpipe.lifecycle.AbstractLifecycle;
import com.ctrip.xpipe.api.observer.Observable;
import com.ctrip.xpipe.observer.AbstractLifecycleObservable;
import com.ctrip.xpipe.redis.core.entity.KeeperDiskInfo;
import com.ctrip.xpipe.redis.keeper.config.KeeperContainerConfig;
import com.ctrip.xpipe.redis.keeper.health.job.DiskIOStatCheckJob;
import com.ctrip.xpipe.redis.keeper.health.job.DiskReadWriteCheckJob;
import com.ctrip.xpipe.redis.keeper.health.job.DiskSpaceUsageCheckJob;
import com.ctrip.xpipe.utils.StringUtil;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask;
import org.springframework.stereotype.Component;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* @author lishanglin
* date 2023/11/9
*/
@Component
public class DiskHealthChecker extends AbstractLifecycle implements TopElement {
public class DiskHealthChecker extends AbstractLifecycleObservable implements TopElement, Observable {

KeeperContainerConfig keeperContainerConfig;

private ScheduledExecutorService scheduled;

private DynamicDelayPeriodTask checkTask;

private AtomicReference<HealthState> state;

private AtomicInteger rounds;

private AtomicReference<KeeperDiskInfo> result;

private static final int checkTimeoutSeconds = Integer.parseInt(System.getProperty("keeper.disk.check.timeout.sec", "10"));

public DiskHealthChecker(KeeperContainerConfig keeperContainerConfig) {
this.keeperContainerConfig = keeperContainerConfig;
this.result = new AtomicReference<>(new KeeperDiskInfo());
this.state = new AtomicReference<>(HealthState.HEALTHY);
this.rounds = new AtomicInteger(0);
}

@Override
Expand Down Expand Up @@ -75,20 +81,43 @@

diskInfo.available = diskReadWriteCheckJob.execute().get(checkTimeoutSeconds, TimeUnit.SECONDS);
diskInfo.spaceUsageInfo = diskSpaceUsageCheckJob.execute().get(checkTimeoutSeconds, TimeUnit.SECONDS);
if (null != diskInfo.spaceUsageInfo && !StringUtil.isEmpty(diskInfo.spaceUsageInfo.source)) {
DiskIOStatCheckJob diskIOStatCheckJob = new DiskIOStatCheckJob(diskInfo.spaceUsageInfo.getDevice());
diskInfo.ioStatInfo = diskIOStatCheckJob.execute().get(checkTimeoutSeconds, TimeUnit.SECONDS);
}

this.result.set(diskInfo);
logger.debug("[check][end] {}", diskInfo);
setResult(diskInfo);

Check warning on line 85 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/health/DiskHealthChecker.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/health/DiskHealthChecker.java#L85

Added line #L85 was not covered by tests
} catch (Throwable th) {
logger.info("[check] fail", th);
}
}

protected void setResult(KeeperDiskInfo diskInfo) {
this.result.set(diskInfo);
if (diskInfo.available) {
state.set(HealthState.HEALTHY);
} else {
if (state.get() == HealthState.HEALTHY) {
rounds.set(1);
if (rounds.get() >= keeperContainerConfig.checkRoundBeforeMarkDown()) {
state.set(HealthState.DOWN);

Check warning on line 99 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/health/DiskHealthChecker.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/health/DiskHealthChecker.java#L99

Added line #L99 was not covered by tests
} else {
state.set(HealthState.SICK);
}
} else if (state.get() == HealthState.SICK) {
if (rounds.incrementAndGet() >= keeperContainerConfig.checkRoundBeforeMarkDown()) {
state.set(HealthState.DOWN);
}
} else {
// do nothing
}
}

notifyObservers(this.state.get());
}

public KeeperDiskInfo getResult() {
return result.get();
}

public HealthState getState() {
return state.get();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ctrip.xpipe.redis.keeper.health;

/**
* @author lishanglin
* date 2023/12/9
*/
public enum HealthState {
HEALTHY(true),
SICK(true),
DOWN(false);

private boolean up;

HealthState(boolean up) {
this.up = up;
}

public boolean isUp() {
return up;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import java.io.File;
import java.io.IOException;
import java.sql.Time;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -138,6 +139,8 @@

private LeaderElectorManager leaderElectorManager;

private volatile long lastResetElectionTime = 0;

private volatile AtomicReference<RdbDumper> rdbDumper = new AtomicReference<RdbDumper>(null);
private long lastDumpTime = -1;
//for test
Expand Down Expand Up @@ -194,11 +197,22 @@
try {
LifecycleHelper.stopIfPossible(leaderElector);
LifecycleHelper.startIfPossible(leaderElector);
} catch (Exception e) {
logger.info("[resetElection][fail][{}]", replId, e);
this.lastResetElectionTime = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
} catch (Throwable th) {
logger.info("[resetElection][fail][{}]", replId, th);

Check warning on line 202 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java#L200-L202

Added lines #L200 - L202 were not covered by tests
}
}

@Override
public boolean isLeader() {
return getLifecycleState().isStarted() && leaderElector.hasLeaderShip();
}

@Override
public long getLastElectionResetTime() {
return this.lastResetElectionTime;

Check warning on line 213 in redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java#L213

Added line #L213 was not covered by tests
}

@Override
protected void doInitialize() throws Exception {
super.doInitialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.ctrip.xpipe.redis.keeper.handler.CommandHandlerManagerTest;
import com.ctrip.xpipe.redis.keeper.handler.applier.ApplierCommandHandlerTest;
import com.ctrip.xpipe.redis.keeper.handler.keeper.*;
import com.ctrip.xpipe.redis.keeper.health.DiskHealthCheckerTest;
import com.ctrip.xpipe.redis.keeper.impl.*;
import com.ctrip.xpipe.redis.keeper.impl.fakeredis.*;
import com.ctrip.xpipe.redis.keeper.monitor.PsyncFailReasonTest;
Expand Down Expand Up @@ -80,6 +81,8 @@
GtidCmdOneSegmentReaderTest.class,
GtidSetCommandReaderTest.class,

DiskHealthCheckerTest.class,

AllApplierTests.class,
})
public class AllTests {
Expand Down
Loading
Loading