Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api for bi_direction cluster migration records #871

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class TestMetaCache implements MetaCache {
public TestMetaCache(){
}

public void setXpipeMeta(XpipeMeta xpipeMeta) {
this.xpipeMeta = xpipeMeta;
}

@Override
public XpipeMeta getXpipeMeta() {
return xpipeMeta;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
package com.ctrip.xpipe.redis.console.controller.api.migrate;

import com.ctrip.xpipe.api.migration.DcMapper;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.redis.checker.controller.result.RetMessage;
import com.ctrip.xpipe.redis.console.cache.DcCache;
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.*;
import com.ctrip.xpipe.redis.console.entity.ClusterEntity;
import com.ctrip.xpipe.redis.console.entity.MigrationBiClusterEntity;
import com.ctrip.xpipe.redis.console.migration.model.MigrationCluster;
import com.ctrip.xpipe.redis.console.migration.model.MigrationEvent;
import com.ctrip.xpipe.redis.console.migration.status.MigrationStatus;
import com.ctrip.xpipe.redis.console.model.MigrationClusterTbl;
import com.ctrip.xpipe.redis.console.repository.ClusterRepository;
import com.ctrip.xpipe.redis.console.repository.MigrationBiClusterRepository;
import com.ctrip.xpipe.redis.console.service.migration.MigrationService;
import com.ctrip.xpipe.redis.console.service.migration.exception.*;
import com.ctrip.xpipe.redis.console.service.migration.impl.MigrationRequest;
import com.ctrip.xpipe.redis.console.service.migration.impl.TryMigrateResult;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.StringUtil;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.apache.commons.lang3.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -21,6 +31,7 @@

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* @author wenchao.meng
Expand All @@ -36,6 +47,15 @@ public class MigrationApi extends AbstractConsoleController {
@Autowired
private MigrationService migrationService;

@Autowired
private MigrationBiClusterRepository biMigrationRepository;

@Autowired
private ClusterRepository clusterRepository;

@Autowired
private MetaCache metaCache;

@Autowired
private DcCache dcCache;

Expand Down Expand Up @@ -206,6 +226,66 @@ public Map<String, List<ClusterMigrationStatus>> getClusterMigrationHistory(@Req
return resp;
}

@GetMapping(value = "/v2/history")
public Map<String, List<ClusterMigrationStatusV2>> getClusterMigrationHistoryV2(@RequestBody MigrationHistoryReq req) {
logger.info("[historyV2][{}-{}] {}", req.from, req.to, req.clusters);
long current = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
if (req.from < 0 || req.from >= current) return Collections.emptyMap();
if (null == req.clusters || req.clusters.isEmpty()) return Collections.emptyMap();
if (req.to < req.from) req.to = current;

List<ClusterEntity> clusters = clusterRepository.selectAllByClusterName(req.clusters);
Map<String, ClusterEntity> oneWayClusters = new HashMap<>();
Map<Long, ClusterEntity> biDirectionClusters = new HashMap<>();

for (ClusterEntity cluster: clusters) {
if (ClusterType.isSameClusterType(cluster.getClusterType(), ClusterType.ONE_WAY)) {
oneWayClusters.put(cluster.getClusterName(), cluster);
} else if (ClusterType.isSameClusterType(cluster.getClusterType(), ClusterType.BI_DIRECTION)) {
biDirectionClusters.put(cluster.getId(), cluster);
}
}

long from = TimeUnit.SECONDS.toMillis(req.from);
long to = TimeUnit.SECONDS.toMillis(req.to);
List<MigrationClusterTbl> migrationClusterTbls = migrationService.fetchMigrationClusters(oneWayClusters.keySet(), from, to);
List<MigrationBiClusterEntity> biMigrationRecords = biMigrationRepository.selectAllByClusterIdAndOpTime(biDirectionClusters.keySet(),
new Date(from), new Date(to));

Map<String, List<ClusterMigrationStatusV2>> resp = new HashMap<>();
for (MigrationClusterTbl migrationClusterTbl: migrationClusterTbls) {
String clusterName = migrationClusterTbl.getCluster().getClusterName();
ClusterEntity cluster = oneWayClusters.get(clusterName);
if (!resp.containsKey(clusterName)) resp.put(clusterName, new ArrayList<>());
ClusterMigrationStatusV2 clusterMigrationStatus = ClusterMigrationStatusV2.from(cluster, migrationClusterTbl, dcCache);
resp.get(clusterName).add(clusterMigrationStatus);
}

for (MigrationBiClusterEntity biMigrationRecord: biMigrationRecords) {
Long clusterId = biMigrationRecord.getClusterId();
ClusterEntity cluster = biDirectionClusters.get(clusterId);
String clusterName = cluster.getClusterName();
if (!resp.containsKey(clusterName)) resp.put(clusterName, new ArrayList<>());
ClusterMigrationStatusV2 clusterMigrationStatus = ClusterMigrationStatusV2.from(cluster, biMigrationRecord, getBiRelatedDcs(clusterName));
resp.get(clusterName).add(clusterMigrationStatus);
}

return resp;
}

private Set<String> getBiRelatedDcs(String clusterId) {
XpipeMeta xpipeMeta = metaCache.getXpipeMeta();
for (DcMeta dcMeta: xpipeMeta.getDcs().values()) {
if (dcMeta.getClusters().containsKey(clusterId)) {
ClusterMeta cluster = dcMeta.getClusters().get(clusterId);
String dcs = cluster.getDcs();
if (StringUtil.isEmpty(dcs)) return Collections.emptySet();
else return new HashSet<>(Arrays.asList(dcs.split("\\s*,\\s*")));
}
}
return Collections.emptySet();
}

@RequestMapping(value = "/migration/system/health/status", method = RequestMethod.GET)
public RetMessage getMigrationSystemHealthStatus() {
logger.info("[getMigrationSystemHealthStatus]");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.ctrip.xpipe.redis.console.controller.api.migrate.meta;

import com.ctrip.xpipe.api.codec.Codec;
import com.ctrip.xpipe.api.codec.GenericTypeReference;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.redis.console.cache.DcCache;
import com.ctrip.xpipe.redis.console.entity.ClusterEntity;
import com.ctrip.xpipe.redis.console.entity.MigrationBiClusterEntity;
import com.ctrip.xpipe.redis.console.migration.status.MigrationStatus;
import com.ctrip.xpipe.redis.console.model.DcTbl;
import com.ctrip.xpipe.redis.console.model.MigrationClusterTbl;
import org.springframework.core.ParameterizedTypeReference;

import java.util.*;
import java.util.concurrent.TimeUnit;

public class ClusterMigrationStatusV2 {

public Long startAt;

public Long endAt;

public String clusterType;

public Set<String> sourceDcs;

public Set<String> destDcs;

public String status;

public static ClusterMigrationStatusV2 from(ClusterEntity cluster, MigrationClusterTbl migrationClusterTbl, DcCache dcCache) {
ClusterMigrationStatusV2 migrationStatus = new ClusterMigrationStatusV2();
migrationStatus.clusterType = ClusterType.lookup(cluster.getClusterType()).name();
migrationStatus.startAt = TimeUnit.MILLISECONDS.toSeconds(migrationClusterTbl.getStartTime().getTime());
if (null != migrationClusterTbl.getEndTime()) {
migrationStatus.endAt = TimeUnit.MILLISECONDS.toSeconds(migrationClusterTbl.getEndTime().getTime());
} else {
migrationStatus.endAt = null;
}

DcTbl srcDcTbl = dcCache.find(migrationClusterTbl.getSourceDcId());
DcTbl destDcTbl = dcCache.find(migrationClusterTbl.getDestinationDcId());
if (null != srcDcTbl) migrationStatus.sourceDcs = Collections.singleton(srcDcTbl.getDcName());
if (null != destDcTbl) migrationStatus.destDcs = Collections.singleton(destDcTbl.getDcName());
migrationStatus.status = MigrationStatus.valueOf(migrationClusterTbl.getStatus()).getType();

return migrationStatus;
}

private static GenericTypeReference<Set<String>> stringSetType = new GenericTypeReference<Set<String>>(){};
public static ClusterMigrationStatusV2 from(ClusterEntity cluster, MigrationBiClusterEntity biMigrationRecord, Set<String> relatedDcs) {
ClusterMigrationStatusV2 migrationStatus = new ClusterMigrationStatusV2();
migrationStatus.clusterType = ClusterType.lookup(cluster.getClusterType()).name();
migrationStatus.startAt = TimeUnit.MILLISECONDS.toSeconds(biMigrationRecord.getOperationTime().getTime());
migrationStatus.endAt = TimeUnit.MILLISECONDS.toSeconds(biMigrationRecord.getOperationTime().getTime());

Set<String> srcDcs = relatedDcs;
Set<String> destDcs = new HashSet<>(srcDcs);
Set<String> excludedDcs = Codec.DEFAULT.decode(biMigrationRecord.getPublishInfo(), stringSetType);
destDcs.removeAll(excludedDcs);

migrationStatus.sourceDcs = srcDcs;
migrationStatus.destDcs = destDcs;
migrationStatus.status = biMigrationRecord.getStatus();

return migrationStatus;
}

@Override
public String toString() {
return "ClusterMigrationStatusV2{" +
"startAt=" + startAt +
", endAt=" + endAt +
", clusterType='" + clusterType + '\'' +
", sourceDcs=" + sourceDcs +
", destDcs=" + destDcs +
", status='" + status + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.springframework.stereotype.Repository;

import javax.annotation.Resource;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

@Repository
Expand All @@ -23,6 +25,15 @@ public ClusterEntity selectByClusterName(String clusterName) {
return clusterMapper.selectOne(wrapper);
}

public List<ClusterEntity> selectAllByClusterName(Collection<String> clusterNames) {
if (clusterNames == null || clusterNames.isEmpty()) {
return Collections.emptyList();
}
QueryWrapper<ClusterEntity> query = new QueryWrapper<>();
query.in(ClusterEntity.CLUSTER_NAME, clusterNames);
return clusterMapper.selectList(query);
}

public List<ClusterEntity> selectAllByIds(List<Long> ids) {
return clusterMapper.selectBatchIds(ids);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ctrip.xpipe.redis.console.repository;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ctrip.xpipe.redis.console.entity.MigrationBiClusterEntity;
import com.ctrip.xpipe.redis.console.mapper.MigrationBiClusterMapper;
import org.apache.ibatis.session.ExecutorType;
Expand All @@ -8,6 +9,8 @@
import org.springframework.stereotype.Repository;

import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date;
import java.util.List;

/**
Expand Down Expand Up @@ -39,6 +42,16 @@ public void batchInsert(List<MigrationBiClusterEntity> migrationBiClusterEntitie
}
}

public List<MigrationBiClusterEntity> selectAllByClusterIdAndOpTime(Collection<Long> clusterIds,
Date from, Date to) {
QueryWrapper<MigrationBiClusterEntity> query = new QueryWrapper<>();
query.in(MigrationBiClusterEntity.CLUSTER_ID, clusterIds);
query.ge(MigrationBiClusterEntity.OPERATION_TIME, from);
query.lt(MigrationBiClusterEntity.OPERATION_TIME, to);

return migrationBiClusterMapper.selectList(query);
}

public List<MigrationBiClusterEntity> selectAll() {
return migrationBiClusterMapper.selectList(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ctrip.xpipe.redis.console.healthcheck.nonredis.migration.MigrationSystemAvailableChecker;
import com.ctrip.xpipe.redis.console.migration.MigrationResources;
import com.ctrip.xpipe.redis.console.migration.manager.MigrationEventManager;
import com.ctrip.xpipe.redis.console.migration.status.MigrationStatus;
import com.ctrip.xpipe.redis.console.model.ClusterTbl;
import com.ctrip.xpipe.redis.console.repository.MigrationBiClusterRepository;
import com.ctrip.xpipe.redis.console.service.ClusterService;
Expand Down Expand Up @@ -193,7 +194,7 @@ private void tryRecordBiMigration(ClusterTbl clusterTbl, String[] excludes, bool
try {
MigrationBiClusterEntity migrationRecord = new MigrationBiClusterEntity();
migrationRecord.setClusterId(clusterTbl.getId());
migrationRecord.setStatus(result ? "SUCCESS":"FAIL");
migrationRecord.setStatus(result ? MigrationStatus.TYPE_SUCCESS : MigrationStatus.TYPE_FAIL);
migrationRecord.setOperator("Beacon");
migrationRecord.setPublishInfo(Codec.DEFAULT.encode(excludes));
migrationRecord.setOperationTime(new Date());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ public boolean syncBiMigration(BiMigrationReq biMigrationReq, String operator) t
for (ClusterTbl cluster: migrateClusters) {
MigrationBiClusterEntity entity = new MigrationBiClusterEntity();
entity.setClusterId(cluster.getId());
entity.setStatus(rst ? "SUCCESS":"FAIL");
entity.setStatus(rst ? MigrationStatus.TYPE_SUCCESS : MigrationStatus.TYPE_FAIL);
entity.setOperator(operator);
entity.setPublishInfo(Codec.DEFAULT.encode(excludedDcs));
entity.setOperationTime(current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9053,7 +9053,7 @@ eval("angular\n .module('index')\n .controller('BiMigrationCtl', BiMigrati
\********************************************************/
/***/ (() => {

eval("angular\n .module('index')\n .controller('BiMigrationEventListCtl', BiMigrationEventListCtl);\nBiMigrationEventListCtl.$inject = ['$rootScope', '$scope', '$window', '$stateParams', 'AppUtil',\n 'toastr', 'NgTableParams', 'MigrationService', '$q'];\nfunction BiMigrationEventListCtl($rootScope, $scope, $window, $stateParams, AppUtil, toastr, NgTableParams, MigrationService, $q) {\n MigrationService.findAllBiMigration().then(function (data) {\n $scope.tableParams = new NgTableParams({\n page: 1,\n count: 10\n }, {\n filterDelay: 100,\n dataset: data,\n });\n });\n}\n\n\n//# sourceURL=webpack://XPipe-Console/./scripts/controllers/BiMigrationEventListCtl.ts?");
eval("angular\n .module('index')\n .controller('BiMigrationEventListCtl', BiMigrationEventListCtl);\nBiMigrationEventListCtl.$inject = ['$rootScope', '$scope', '$window', '$stateParams', 'AppUtil',\n 'toastr', 'NgTableParams', 'MigrationService', '$q'];\nfunction BiMigrationEventListCtl($rootScope, $scope, $window, $stateParams, AppUtil, toastr, NgTableParams, MigrationService, $q) {\n MigrationService.findAllBiMigration().then(function (data) {\n $scope.tableParams = new NgTableParams({\n page: 1,\n count: 10\n }, {\n filterDelay: 100,\n dataset: data.reverse(),\n });\n });\n}\n\n\n//# sourceURL=webpack://XPipe-Console/./scripts/controllers/BiMigrationEventListCtl.ts?");

/***/ }),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ function BiMigrationEventListCtl($rootScope, $scope, $window, $stateParams, AppU
count : 10
}, {
filterDelay:100,
dataset: data,
dataset: data.reverse(),
});
})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package com.ctrip.xpipe.redis.console.controller.api.migrate;

import com.ctrip.xpipe.redis.checker.impl.TestMetaCache;
import com.ctrip.xpipe.redis.console.AbstractConsoleIntegrationTest;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.ClusterMigrationStatus;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.ClusterMigrationStatusV2;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.MigrationHistoryReq;
import com.ctrip.xpipe.redis.console.resources.DefaultMetaCache;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.redis.core.meta.XpipeMetaManager;
import com.ctrip.xpipe.tuple.Pair;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -22,6 +31,21 @@ public class MigrationApiIntegrationTest extends AbstractConsoleIntegrationTest
@Autowired
private MigrationApi migrationApi;

@Autowired
private MetaCache metaCache;

@Before
public void setupMigrationApiIntegrationTest() {
XpipeMeta xpipeMeta = new XpipeMeta();
DcMeta dcMeta = new DcMeta("jq");
ClusterMeta clusterMeta = new ClusterMeta("bi_cluster1");
clusterMeta.setDcs("jq,oy");
dcMeta.addCluster(clusterMeta);
xpipeMeta.addDc(dcMeta);

((TestMetaCache) metaCache).setXpipeMeta(xpipeMeta);
}

@Test
public void testGetClusterMigrationHistory() {
MigrationHistoryReq req = new MigrationHistoryReq();
Expand All @@ -36,6 +60,28 @@ public void testGetClusterMigrationHistory() {
Assert.assertEquals("Processing", resp.get("cluster2").get(0).status);
}

@Test
public void testGetClusterMigrationHistoryV2() {
MigrationHistoryReq req = new MigrationHistoryReq();
req.from = 0;
req.to = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) + 600;
req.clusters = new HashSet<>(Arrays.asList("cluster2", "bi_cluster1"));

getXpipeMeta();
Map<String, List<ClusterMigrationStatusV2>> resp = migrationApi.getClusterMigrationHistoryV2(req);
logger.info("[testGetClusterMigrationHistoryV2] {}", resp);
Assert.assertTrue(resp.containsKey("cluster2"));
Assert.assertEquals(1, resp.get("cluster2").size());
Assert.assertEquals("Processing", resp.get("cluster2").get(0).status);
Assert.assertEquals(Collections.singleton("jq"), resp.get("cluster2").get(0).sourceDcs);
Assert.assertEquals(Collections.singleton("oy"), resp.get("cluster2").get(0).destDcs);
Assert.assertTrue(resp.containsKey("bi_cluster1"));
Assert.assertEquals(1, resp.get("bi_cluster1").size());
Assert.assertEquals("Success", resp.get("bi_cluster1").get(0).status);
Assert.assertEquals(new HashSet<>(Arrays.asList("jq", "oy")), resp.get("bi_cluster1").get(0).sourceDcs);
Assert.assertEquals(Collections.singleton("oy"), resp.get("bi_cluster1").get(0).destDcs);
}

protected String prepareDatas() throws IOException {
return prepareDatasFromFile("src/test/resources/migration-test.sql");
}
Expand Down
Loading
Loading