Skip to content

Commit

Permalink
aggregate instance pull
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Aug 21, 2024
1 parent 47f2335 commit 9a4ad66
Show file tree
Hide file tree
Showing 18 changed files with 555 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface OuterClientService extends Ordered{

void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostPort, long noModifySeconds) throws OuterClientException;

void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException;

boolean clusterMigratePreCheck(String clusterName) throws OuterClientException;

MigrationPublishResult doMigrationPublish(String clusterName, String primaryDcName, List<InetSocketAddress> newMasters) throws OuterClientException;
Expand Down Expand Up @@ -724,6 +726,114 @@ public void setMessage(String message) {
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
class MarkInstanceRequest{
private List<HostPortDcStatus> hostPortDcStatuses;
private String clusterName;
private int noModifySeconds;
private String activeDc;

public MarkInstanceRequest() {
}

public MarkInstanceRequest(List<HostPortDcStatus> hostPortDcStatuses, String clusterName, String activeDc) {
this.hostPortDcStatuses = hostPortDcStatuses;
this.clusterName = clusterName;
this.noModifySeconds = 0;
this.activeDc = activeDc;
}

public MarkInstanceRequest(List<HostPortDcStatus> hostPortDcStatuses, String clusterName, int noModifySeconds, String activeDc) {
this.hostPortDcStatuses = hostPortDcStatuses;
this.clusterName = clusterName;
this.noModifySeconds = noModifySeconds;
this.activeDc = activeDc;
}

public List<HostPortDcStatus> getHostPortDcStatuses() {
return hostPortDcStatuses;
}

public void setHostPortDcStatuses(List<HostPortDcStatus> hostPortDcStatuses) {
this.hostPortDcStatuses = hostPortDcStatuses;
}

public String getClusterName() {
return clusterName;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public int getNoModifySeconds() {
return noModifySeconds;
}

public void setNoModifySeconds(int noModifySeconds) {
this.noModifySeconds = noModifySeconds;
}

public String getActiveDc() {
return activeDc;
}

public void setActiveDc(String activeDc) {
this.activeDc = activeDc;
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
class HostPortDcStatus{

private String host;
private int port;
private String dc;
private boolean canRead;

public HostPortDcStatus() {
}

public HostPortDcStatus(String host, int port, String dc, boolean canRead) {
this.host = host;
this.port = port;
this.dc = dc;
this.canRead = canRead;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public boolean isCanRead() {
return canRead;
}

public void setHost(String host) {
this.host = host;
}

public void setPort(int port) {
this.port = port;
}

public String getDc() {
return dc;
}

public void setDc(String dc) {
this.dc = dc;
}

public void setCanRead(boolean canRead) {
this.canRead = canRead;
}
}

enum ClusterType {
SINGEL_DC(0){
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostP

}

@Override
public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException {

}

@Override
public OuterClientDataResp<List<ClusterExcludedIdcInfo>> getAllExcludedIdcs() throws Exception {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ public void markInstanceDownIfNoModifyFor(ClusterShardHostPort clusterShardHostP
instanceStatus.put(clusterShardHostPort.getHostPort(), false);
}

@Override
public void batchMarkInstance(MarkInstanceRequest markInstanceRequest) throws OuterClientException {
logger.info("[batchMarkInstance]{}", markInstanceRequest);
for (HostPortDcStatus hostPortDcStatus : markInstanceRequest.getHostPortDcStatuses()) {
instanceStatus.put(new HostPort(hostPortDcStatus.getHost(), hostPortDcStatus.getPort()), hostPortDcStatus.isCanRead());
}
}

@Override
public OuterClientDataResp<List<ClusterExcludedIdcInfo>> getAllExcludedIdcs() throws Exception {
OuterClientDataResp<List<ClusterExcludedIdcInfo>> resp = new OuterClientDataResp<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,8 @@ public interface CheckerConfig {

int getKeeperCheckerIntervalMilli();

int getInstancePullIntervalSeconds();

int getInstancePullRandomSeconds();

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ public class CheckConfigBean extends AbstractConfigBean {

public static final String KEY_KEEPER_CHECKER_INTERVAL = "keeper.checker.interval";

public static final String KEY_CHECKER_INSTANCE_PULL_INTERVAL = "checker.instance.pull.interval";

public static final String KEY_CHECKER_INSTANCE_PULL_RANDOM = "checker.instance.pull.random";

private FoundationService foundationService;

@Autowired
Expand Down Expand Up @@ -342,6 +346,14 @@ public int getKeeperCheckerIntervalMilli() {
return getIntProperty(KEY_KEEPER_CHECKER_INTERVAL, 60 * 1000);
}

public int getInstancePullIntervalSeconds() {
return getIntProperty(KEY_CHECKER_INSTANCE_PULL_INTERVAL, 5);
}

public int getInstancePullRandomSeconds() {
return getIntProperty(KEY_CHECKER_INSTANCE_PULL_RANDOM, 5);
}

public int getStableResetAfterRounds() {
return getIntProperty(KEY_CHECKER_STABLE_RESET_AFTER_ROUNDS, 30);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void go() throws Exception {

XPipeInstanceHealthHolder xpipeInstanceHealth = instanceHealth.getKey();
OutClientInstanceHealthHolder outClientInstanceHealth = instanceHealth.getValue();
UpDownInstances instanceNeedAdjust = findHostPortNeedAdjust(xpipeInstanceHealth, outClientInstanceHealth, interested);
UpDownInstances instanceNeedAdjust = findHostPortNeedAdjust(xpipeInstanceHealth, outClientInstanceHealth, interested, logger);

checkTimeout(timeoutMill, "after compare");
if (!instanceNeedAdjust.getHealthyInstances().isEmpty())
Expand Down Expand Up @@ -170,9 +170,9 @@ protected Map<String, Set<HostPort>> fetchInterestedClusterInstances() {
return interestedClusterInstances;
}

protected UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipeInstanceHealthHolder,
public UpDownInstances findHostPortNeedAdjust(XPipeInstanceHealthHolder xpipeInstanceHealthHolder,
OutClientInstanceHealthHolder outClientInstanceHealthHolder,
Map<String, Set<HostPort>> interested) {
Map<String, Set<HostPort>> interested, Logger logger) {
int quorum = config.getQuorum();
UpDownInstances xpipeInstances = xpipeInstanceHealthHolder.aggregate(interested, quorum);
UpDownInstances outClientInstances = outClientInstanceHealthHolder.extractReadable(interested);
Expand Down
Loading

0 comments on commit 9a4ad66

Please sign in to comment.