Skip to content

Commit

Permalink
aggregate instance pull
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Aug 21, 2024
1 parent 47f2335 commit 6ec73cd
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface OuterClientService extends Ordered{

void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException;

void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException;

boolean clusterMigratePreCheck(String clusterName) throws OuterClientException;

MigrationPublishResult doMigrationPublish(String clusterName, String primaryDcName, List<InetSocketAddress> newMasters) throws OuterClientException;
Expand Down Expand Up @@ -724,6 +726,114 @@ public void setMessage(String message) {
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
class MarkInstanceRequest{
private List<HostPortDcStatus> hostPortDcStatuses;
private String clusterName;
private int noModifySeconds;
private String activeDc;

public MarkInstanceRequest() {
}

public MarkInstanceRequest(List<HostPortDcStatus> hostPortDcStatuses, String clusterName, String activeDc) {
this.hostPortDcStatuses = hostPortDcStatuses;
this.clusterName = clusterName;
this.noModifySeconds = 0;
this.activeDc = activeDc;
}

public MarkInstanceRequest(List<HostPortDcStatus> hostPortDcStatuses, String clusterName, int noModifySeconds, String activeDc) {
this.hostPortDcStatuses = hostPortDcStatuses;
this.clusterName = clusterName;
this.noModifySeconds = noModifySeconds;
this.activeDc = activeDc;
}

public List<HostPortDcStatus> getHostPortDcStatuses() {
return hostPortDcStatuses;
}

public void setHostPortDcStatuses(List<HostPortDcStatus> hostPortDcStatuses) {
this.hostPortDcStatuses = hostPortDcStatuses;
}

public String getClusterName() {
return clusterName;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public int getNoModifySeconds() {
return noModifySeconds;
}

public void setNoModifySeconds(int noModifySeconds) {
this.noModifySeconds = noModifySeconds;
}

public String getActiveDc() {
return activeDc;
}

public void setActiveDc(String activeDc) {
this.activeDc = activeDc;
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
class HostPortDcStatus{

private String host;
private int port;
private String dc;
private boolean canRead;

public HostPortDcStatus() {
}

public HostPortDcStatus(String host, int port, String dc, boolean canRead) {
this.host = host;
this.port = port;
this.dc = dc;
this.canRead = canRead;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public boolean isCanRead() {
return canRead;
}

public void setHost(String host) {
this.host = host;
}

public void setPort(int port) {
this.port = port;
}

public String getDc() {
return dc;
}

public void setDc(String dc) {
this.dc = dc;
}

public void setCanRead(boolean canRead) {
this.canRead = canRead;
}
}

enum ClusterType {
SINGEL_DC(0){
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostP

}

@Override
public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException {

}

@Override
public OuterClientDataResp<List<ClusterExcludedIdcInfo>> getAllExcludedIdcs() throws Exception {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostP
instanceStatus.put(clusterShardHostPort.getHostPort(), false);
}

@Override
public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException {
logger.info("[batchMarkInstance]{}", markInstanceRequest);
for (HostPortDcStatus hostPortDcStatus : markInstanceRequest.getHostPortDcStatuses()) {
instanceStatus.put(new HostPort(hostPortDcStatus.getHost(), hostPortDcStatus.getPort()), hostPortDcStatus.isCanRead());
}
}

@Override
public OuterClientDataResp<List<ClusterExcludedIdcInfo>> getAllExcludedIdcs() throws Exception {
OuterClientDataResp<List<ClusterExcludedIdcInfo>> resp = new OuterClientDataResp<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,8 @@ public interface CheckerConfig {

int getKeeperCheckerIntervalMilli();

int getInstancePullIntervalSeconds();

int getInstancePullRandomSeconds();

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public class CheckConfigBean extends AbstractConfigBean {

public static final String KEY_KEEPER_CHECKER_INTERVAL = "keeper.checker.interval";

public static final String KEY_CHECKER_INSTANCE_PULL_INTERVAL = "checker.instance.pull.interval";

public static final String KEY_CHECKER_INSTANCE_PULL_RANDOM = "checker.instance.pull.random";

private FoundationService foundationService;

@Autowired
Expand Down Expand Up @@ -342,6 +346,14 @@ public int getKeeperCheckerIntervalMilli() {
return getIntProperty(KEY_KEEPER_CHECKER_INTERVAL, 60 * 1000);
}

public int getInstancePullIntervalSeconds() {
return getIntProperty(KEY_CHECKER_INSTANCE_PULL_INTERVAL, 5);
}

public int getInstancePullRandomSeconds() {
return getIntProperty(KEY_CHECKER_INSTANCE_PULL_RANDOM, 5);
}

public int getStableResetAfterRounds() {
return getIntProperty(KEY_CHECKER_STABLE_RESET_AFTER_ROUNDS, 30);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void go() throws Exception {

XPipeInstanceHealthHolder xpipeInstanceHealth = instanceHealth.getKey();
OutClientInstanceHealthHolder outClientInstanceHealth = instanceHealth.getValue();
UpDownInstances instanceNeedAdjust = findHostPortNeedAdjust(xpipeInstanceHealth, outClientInstanceHealth, interested);
UpDownInstances instanceNeedAdjust = findHostPortNeedAdjust(xpipeInstanceHealth, outClientInstanceHealth, interested, logger);

checkTimeout(timeoutMill, "after compare");
if (!instanceNeedAdjust.getHealthyInstances().isEmpty())
Expand Down Expand Up @@ -170,9 +170,9 @@ protected Map<String, Set<HostPort>> fetchInterestedClusterInstances() {
return interestedClusterInstances;
}

protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipeInstanceHealthHolder,
public UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipeInstanceHealthHolder,
OutClientInstanceHealthHolder outClientInstanceHealthHolder,
Map<String, Set<HostPort>> interested) {
Map<String, Set<HostPort>> interested, Logger logger) {
int quorum = config.getQuorum();
UpDownInstances xpipeInstances = xpipeInstanceHealthHolder.aggregate(interested, quorum);
UpDownInstances outClientInstances = outClientInstanceHealthHolder.extractReadable(interested);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public abstract class AbstractHealthEventHandler<T extends AbstractInstanceEvent
@Autowired
private CheckerConfig checkerConfig;

@Autowired
private OuterClientAggregator outerClientAggregator;

protected static final String currentDcId = FoundationService.DEFAULT.getDataCenter();

@SuppressWarnings("unchecked")
Expand All @@ -73,8 +76,8 @@ protected void tryMarkDown(AbstractInstanceEvent event) {
logger.warn("[onEvent][site down, skip] {}", event);
return;
}
if(!event.getInstance().getCheckInfo().isCrossRegion() && !masterUp(event)) {
logger.info("[onEvent][master down, do not call client service]{}", event);
if(!event.getInstance().getCheckInfo().isCrossRegion()) {
logger.info("[onEvent][cross region, do not call client service]{}", event);
return;
}
if (event instanceof InstanceLoading) {
Expand Down Expand Up @@ -116,7 +119,7 @@ protected void doRealMarkUp(final AbstractInstanceEvent event) {
logger.info("[doRealMarkUp][{} is cross region, do not call client service ]{}", info, event);
return;
}
getHealthStateSetterManager().set(event.getInstance().getCheckInfo().getClusterShardHostport(), true);
outerClientAggregator.markInstance(event.getInstance().getCheckInfo().getClusterShardHostport());
}

protected void doRealMarkDown(final AbstractInstanceEvent event) {
Expand All @@ -129,7 +132,7 @@ protected void doRealMarkDown(final AbstractInstanceEvent event) {
logger.warn("[markdown] instance state up now, do not mark down, {}", info);
} else {
logger.info("[markdown] mark down redis, {}, {}", event.getInstance().getCheckInfo(), event.getClass().getSimpleName());
getHealthStateSetterManager().set(info.getClusterShardHostport(), false);
outerClientAggregator.markInstance(event.getInstance().getCheckInfo().getClusterShardHostport());
}
}

Expand All @@ -138,23 +141,10 @@ private boolean stateUpNow(AbstractInstanceEvent event) {
.equals(HEALTH_STATE.HEALTHY);
}

protected boolean masterUp(AbstractInstanceEvent instanceEvent) {
RedisInstanceInfo info = instanceEvent.getInstance().getCheckInfo();
HostPort redisMaster = metaCache.findMasterInSameShard(info.getHostPort());
boolean masterUp = defaultDelayPingActionCollector.getState(redisMaster) == HEALTH_STATE.HEALTHY;
if (!masterUp) {
logger.info("[masterUp][master down instance:{}, master:{}]", info, redisMaster);
}
return masterUp;
}

protected boolean quorumState(List<HEALTH_STATE> healthStates, HostPort hostPort) {
List<HEALTH_STATE> health_states = remoteCheckerManager.getHealthStates(hostPort.getHost(), hostPort.getPort());
long matchStates = health_states.stream().filter(healthStates::contains).count();
return matchStates >= checkerConfig.getQuorum();
}

private FinalStateSetterManager<ClusterShardHostPort, Boolean> getHealthStateSetterManager() {
return defaultDelayPingActionCollector.getHealthStateSetterManager();
}
}
Loading

0 comments on commit 6ec73cd

Please sign in to comment.