Skip to content

Commit

Permalink
support exchange region between clusters (#740)
Browse files Browse the repository at this point in the history
* metaserver observer multi keeper-leader zk path

* migrate keeper from clusterId-shardId to replId

* fix keeper repl timeout found late

* Metaserver support shards exchange between clusters

* fix asymmetric cluster delay check down

---------

Co-authored-by: lishanglin <[email protected]>
  • Loading branch information
LanternLee and lishanglin authored Nov 9, 2023
1 parent 269a4fe commit 5bbda38
Show file tree
Hide file tree
Showing 70 changed files with 721 additions and 245 deletions.
24 changes: 21 additions & 3 deletions core/src/main/java/com/ctrip/xpipe/utils/log/MDCUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@
*/
public class MDCUtil {

protected static String MDC_KEY = "xpipe.cluster.shard";
protected static String MDC_KEY_CLUSTER_SHARD = "xpipe.cluster.shard";

protected static String MDC_KEY_KEEPER_REPL = "xpipe.keeper.repl";

public static void setClusterShard(String cluster, String shard) {

MDC.put(MDC_KEY, StringUtil.makeSimpleName(cluster, shard));
MDC.put(MDC_KEY_CLUSTER_SHARD, StringUtil.makeSimpleName(cluster, shard));
}

public static void setKeeperRepl(String replId) {

MDC.put(MDC_KEY_KEEPER_REPL, StringUtil.makeSimpleName(replId, null));
}

@VisibleForTesting
protected static String getClusterShard() {
return MDC.get(MDC_KEY);
return MDC.get(MDC_KEY_CLUSTER_SHARD);

}

Expand All @@ -35,4 +42,15 @@ public void run() {
};
}

public static Runnable decorateKeeperReplMDC(Runnable r, String replId) {
return new Runnable() {

@Override
public void run() {
MDCUtil.setKeeperRepl(replId);
r.run();
}
};
}

}
4 changes: 2 additions & 2 deletions redis/dockerPackage/xpipe-keeper/config/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

<appenders>
<console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%X{xpipe.cluster.shard}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<PatternLayout pattern="%X{xpipe.cluster.shard}%X{xpipe.keeper.repl}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
</console>

<RollingFile name="rollingFileInfo" fileName="${baseDir}/${appName}.log"
filePattern="${baseDir}/${appName}-%d{yyyy-MM-dd}-%i.log.gz">
<PatternLayout pattern="%X{xpipe.cluster.shard}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<PatternLayout pattern="%X{xpipe.cluster.shard}%X{xpipe.keeper.repl}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="500 MB"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

<appenders>
<console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%X{xpipe.cluster.shard}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<PatternLayout pattern="%X{xpipe.cluster.shard}%X{xpipe.keeper.repl}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
</console>

<RollingFile name="rollingFileInfo" fileName="${baseDir}/${appName}.log"
filePattern="${baseDir}/${appName}-%d{yyyy-MM-dd}-%i.log.gz">
<PatternLayout pattern="%X{xpipe.cluster.shard}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<PatternLayout pattern="%X{xpipe.cluster.shard}%X{xpipe.keeper.repl}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="500 MB"/>
Expand Down
4 changes: 2 additions & 2 deletions redis/package/redis-keeper-package/src/main/config/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

<appenders>
<console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%X{xpipe.cluster.shard}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<PatternLayout pattern="%X{xpipe.cluster.shard}%X{xpipe.keeper.repl}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
</console>

<RollingFile name="rollingFileInfo" fileName="${baseDir}/${appName}.log"
filePattern="${baseDir}/${appName}-%d{yyyy-MM-dd}-%i.log.gz">
<PatternLayout pattern="%X{xpipe.cluster.shard}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<PatternLayout pattern="%X{xpipe.cluster.shard}%X{xpipe.keeper.repl}[%d{HH:mm:ss:SSS}][%p][%t][%c{1}]%m%xpEx%n"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="500 MB"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ private MetricData getPoint(DelayActionContext context) {
data.addTag("crossRegion", String.valueOf(info.isCrossRegion()));
if (context instanceof HeteroDelayActionContext) {
data.addTag("srcShardId", String.valueOf(((HeteroDelayActionContext) context).getShardDbId()));
} else {
data.addTag("srcShardId", "-");
}
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.net.Socket;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.ctrip.xpipe.redis.core.entity;

import com.ctrip.xpipe.redis.core.store.ClusterId;
import com.ctrip.xpipe.redis.core.store.ShardId;
import com.ctrip.xpipe.redis.core.store.ReplId;

/**
* @author wenchao.meng
Expand All @@ -15,8 +14,8 @@ public KeeperInstanceMeta(){

}

public KeeperInstanceMeta(ClusterId clusterId, ShardId shardId, KeeperMeta keeperMeta) {
super(clusterId.id(), shardId.id(), keeperMeta);
public KeeperInstanceMeta(ReplId replId, KeeperMeta keeperMeta) {
super(replId.id(), keeperMeta);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,26 @@ public class KeeperTransMeta {
private Long clusterDbId;

private Long shardDbId;

private Long replId;

private KeeperMeta keeperMeta;

//for json conversion
public KeeperTransMeta() {}

public KeeperTransMeta(Long clusterDbId, Long shardDbId, KeeperMeta keeperMeta) {
this(clusterDbId, shardDbId, null, keeperMeta);
}

public KeeperTransMeta(Long replId, KeeperMeta keeperMeta) {
this(null, null, replId, keeperMeta);
}

public KeeperTransMeta(Long clusterDbId, Long shardDbId, Long replId, KeeperMeta keeperMeta) {
this.clusterDbId = clusterDbId;
this.shardDbId = shardDbId;
this.replId = replId;
this.keeperMeta = keeperMeta;
}

Expand Down Expand Up @@ -49,23 +60,32 @@ public void setShardDbId(Long shardDbId) {
this.shardDbId = shardDbId;
}

public Long getReplId() {
return replId;
}

public void setReplId(Long replId) {
this.replId = replId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
KeeperTransMeta that = (KeeperTransMeta) o;
return Objects.equals(clusterDbId, that.clusterDbId) &&
Objects.equals(shardDbId, that.shardDbId) &&
Objects.equals(replId, ((KeeperTransMeta) o).replId) &&
Objects.equals(keeperMeta, that.keeperMeta);
}

@Override
public int hashCode() {
return Objects.hash(clusterDbId, shardDbId, keeperMeta);
return Objects.hash(clusterDbId, shardDbId, replId, keeperMeta);
}

@Override
public String toString() {
return String.format("[%d,%d-%s:%d]", clusterDbId, shardDbId, keeperMeta.getIp(), keeperMeta.getPort());
return String.format("[%d,%d-%d-%s:%d]", clusterDbId, shardDbId, replId, keeperMeta.getIp(), keeperMeta.getPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
* Aug 2, 2016
*/
public interface KeeperContainerService {


KeeperInstanceMeta infoPort(int port);

void addKeeper(KeeperTransMeta keeperTransMeta);

void addOrStartKeeper(KeeperTransMeta keeperTransMeta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.ctrip.xpipe.redis.core.entity.ApplierMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.store.ClusterId;
import com.ctrip.xpipe.redis.core.store.ReplId;
import com.ctrip.xpipe.redis.core.store.ShardId;

/**
Expand Down Expand Up @@ -59,6 +60,14 @@ public static String getApplierLeaderLatchPath(String clusterId, String shardId)
return path;
}

public static String getKeeperLeaderLatchPath(long replId){
return String.format("%s/repl_%d", getZkLeaderLatchRootPath(), replId);
}

public static String getKeeperLeaderLatchPath(ReplId replId){
return String.format("%s/%s", getZkLeaderLatchRootPath(), replId.toString());
}

public static String getKeeperLeaderLatchPath(long clusterDbId, long shardDbId) {
return String.format("%s/cluster_%d/shard_%d", getZkLeaderLatchRootPath(), clusterDbId, shardDbId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import com.ctrip.xpipe.redis.core.entity.ShardMeta;
import org.unidal.tuple.Triple;

import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @author wenchao.meng
Expand All @@ -17,28 +19,51 @@ public class ClusterMetaComparator extends AbstractMetaComparator<ShardMeta>{

private ClusterMeta future;

private Map<Long, ShardMeta> currentGlobalShards;
private Map<Long, ShardMeta> futureGlobalShards;

public ClusterMetaComparator(ClusterMeta current, ClusterMeta future) {
this.current = current;
this.future = future;
}

public void setShardMigrateSupport(Map<Long, ShardMeta> currentGlobalShards, Map<Long, ShardMeta> futureGlobalShards) {
this.currentGlobalShards = currentGlobalShards;
this.futureGlobalShards = futureGlobalShards;
}

private Map<Long, ShardMeta> extractShardsInCluster(ClusterMeta clusterMeta) {
return clusterMeta.getAllShards().values().stream()
.collect(Collectors.toMap(ShardMeta::getDbId, shardMeta -> shardMeta));
}

@Override
public void compare() {
configChanged = checkShallowChange(current, future);

Triple<Set<String>, Set<String>, Set<String>> result = getDiff(current.getAllShards().keySet(), future.getAllShards().keySet());
Map<Long, ShardMeta> currentShards = extractShardsInCluster(current);
Map<Long, ShardMeta> futureShards = extractShardsInCluster(future);
Triple<Set<Long>, Set<Long>, Set<Long>> result = getDiff(currentShards.keySet(), futureShards.keySet());

for(String shardId : result.getFirst()){
added.add(future.findFromAllShards(shardId));
for(Long shardId : result.getFirst()){
// do redundant addAndStart keeper/applier job for shards migrated in
added.add(futureShards.get(shardId));
}

for(String shardId : result.getLast()){
removed.add(current.findFromAllShards(shardId));
for(Long shardId : result.getLast()){
if (null != futureGlobalShards && futureGlobalShards.containsKey(shardId)) {
// for shard migrated out, only release shard manage job but not keeper
ShardMeta currentMeta = currentShards.get(shardId);
ShardMetaComparator comparator = new ShardMetaComparator(currentMeta, null);
modified.add(comparator);
} else {
removed.add(currentShards.get(shardId));
}
}

for(String shardId : result.getMiddle()){
ShardMeta currentMeta = current.findFromAllShards(shardId);
ShardMeta futureMeta = future.findFromAllShards(shardId);
for(Long shardId : result.getMiddle()){
ShardMeta currentMeta = currentShards.get(shardId);
ShardMeta futureMeta = futureShards.get(shardId);
if(!reflectionEquals(currentMeta, futureMeta)){
ShardMetaComparator comparator = new ShardMetaComparator(currentMeta, futureMeta);
comparator.compare();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.ShardMeta;
import com.google.common.collect.Maps;
import org.unidal.tuple.Triple;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/**
Expand All @@ -16,6 +19,10 @@
public class DcMetaComparator extends AbstractMetaComparator<ClusterMeta>{

private DcMeta current, future;

private Map<Long, ShardMeta> currentShards;
private Map<Long, ShardMeta> futureShards;
private AtomicBoolean shardMigrateSupport = new AtomicBoolean(false);

public static DcMetaComparator buildComparator(DcMeta current, DcMeta future){

Expand Down Expand Up @@ -52,11 +59,36 @@ public DcMetaComparator(DcMeta current, DcMeta future) {
this.future = future;
}

public void compare(){
Map<Long, ClusterMeta> currentClustersMap = current.getClusters().values().stream()
.collect(Collectors.toMap(ClusterMeta::getDbId, clusterMeta -> clusterMeta));
Map<Long, ClusterMeta> futureClustersMap = future.getClusters().values().stream()
public void setShardMigrateSupport() {
if (shardMigrateSupport.compareAndSet(false, true)) {
this.currentShards = extractShardsInDc(current);
this.futureShards = extractShardsInDc(future);
}
}

public boolean supportShardMigrate() {
return shardMigrateSupport.get();
}

private Map<Long, ShardMeta> extractShardsInDc(DcMeta dcMeta) {
Map<Long, ShardMeta> shards = Maps.newHashMap();
dcMeta.getClusters().values().forEach(cluster -> {
cluster.getShards().values().forEach(shard -> {
shards.put(shard.getDbId(), shard);
});
});

return shards;
}

private Map<Long, ClusterMeta> extractClustersInDc(DcMeta dcMeta) {
return dcMeta.getClusters().values().stream()
.collect(Collectors.toMap(ClusterMeta::getDbId, clusterMeta -> clusterMeta));
}

public void compare(){
Map<Long, ClusterMeta> currentClustersMap = extractClustersInDc(current);
Map<Long, ClusterMeta> futureClustersMap = extractClustersInDc(future);
Triple<Set<Long>, Set<Long>, Set<Long>> result = getDiff(currentClustersMap.keySet(), futureClustersMap.keySet());

Set<Long> addedClusterDbIds = result.getFirst();
Expand All @@ -76,6 +108,9 @@ public void compare(){
ClusterMeta futureMeta = futureClustersMap.get(clusterDbId);
if(!reflectionEquals(currentMeta, futureMeta)) {
ClusterMetaComparator clusterMetaComparator = new ClusterMetaComparator(currentMeta, futureMeta);
if (supportShardMigrate()) {
clusterMetaComparator.setShardMigrateSupport(currentShards, futureShards);
}
clusterMetaComparator.compare();
modified.add(clusterMetaComparator);
}
Expand Down
Loading

0 comments on commit 5bbda38

Please sign in to comment.