Skip to content

Commit

Permalink
update all dc meta of current dc with metacache
Browse files Browse the repository at this point in the history
  • Loading branch information
songyuyuyu committed Nov 16, 2023
1 parent 7ba87c6 commit 2efe621
Show file tree
Hide file tree
Showing 28 changed files with 244 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface CheckerConsoleService {

XpipeMeta getXpipeAllMeta(String console) throws SAXException, IOException;

XpipeMeta getXpipeAllDCMeta(String console, String dcName) throws SAXException, IOException;
XpipeMeta getXpipeDcAllMeta(String console, String dcName) throws SAXException, IOException;

List<ProxyTunnelInfo> getProxyTunnelInfos(String console);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public interface CheckerConfig {

String KEY_CHECKER_META_REFRESH_INTERVAL = "checker.meta.refresh.interval.milli";

String KEY_CHECKER_CURRENT_DC_ALL_META_REFRESH_INTERVAL = "checker.current_dc_all_meta.refresh.interval.milli";

String KEY_CONSOLE_ADDRESS = "console.address";

String KEY_CHECKER_ACK_INTERVAL = "checker.ack.interval.milli";
Expand All @@ -105,6 +107,8 @@ public interface CheckerConfig {

int getRedisReplicationHealthCheckInterval();

int getCheckerCurrentDcAllMetaRefreshIntervalMilli();

int getClusterHealthCheckInterval();

int getDownAfterCheckNums();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.HealthChecker;
import com.ctrip.xpipe.redis.checker.healthcheck.meta.MetaChangeManager;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.redis.core.meta.CurrentDcAllMeta;
import com.ctrip.xpipe.redis.core.meta.KeeperContainerDetailInfo;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.StringUtil;
Expand Down Expand Up @@ -54,6 +55,9 @@ public class DefaultHealthChecker extends AbstractLifecycle implements HealthChe
@Resource(name = SCHEDULED_EXECUTOR)
private ScheduledExecutorService scheduled;

@Resource
private CurrentDcAllMeta currentDcAllMeta;

private static final String currentDcId = FoundationService.DEFAULT.getDataCenter();

@PostConstruct
Expand Down Expand Up @@ -123,7 +127,7 @@ void generateHealthCheckInstances() {

if (currentDcId.equalsIgnoreCase(dcMeta.getId())) {
Map<Long, KeeperContainerDetailInfo> keeperContainerDetailInfoMap
= getAllKeeperContainerDetailInfoFromDcMeta(dcMeta, getCurrentDcMeta(dcMeta.getId()));
= getAllKeeperContainerDetailInfoFromDcMeta(dcMeta, currentDcAllMeta.getCurrentDcAllMeta());

keeperContainerDetailInfoMap.values().forEach(keeperContainerDetailInfo -> {
generateHealthCheckInstances(keeperContainerDetailInfo);
Expand Down Expand Up @@ -182,16 +186,6 @@ private boolean isClusterInCurrentIdc(ClusterMeta cluster) {
return false;
}

private DcMeta getCurrentDcMeta(String dcId) {
try {
return checkerConsoleService.getXpipeAllDCMeta(checkerConfig.getConsoleAddress(), dcId)
.getDcs().get(dcId);
} catch (Throwable th) {
logger.error("[getCurrentDcMeta] get dc meta of dc {} fail", dcId, th);
}
return null;
}


private boolean clusterDcIsCurrentDc(ClusterMeta clusterMeta) {
return clusterMeta.parent().getId().equalsIgnoreCase(currentDcId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
*/
public interface DcMetaChangeManager extends Startable, Stoppable {

void compare(DcMeta future);
void compare(DcMeta future, DcMeta allFutureDcMeta);

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public DefaultDcMetaChangeManager(String dcId, HealthCheckInstanceManager instan
}

@Override
public void compare(DcMeta future) {
public void compare(DcMeta future, DcMeta allFutureDcMeta) {
// init
if(current == null) {
healthCheckEndpointFactory.updateRoutes();
current = future;
currentDcAllMeta = currentDcId.equalsIgnoreCase(dcId) ? getCurrentDcMeta(dcId) : null;
currentDcAllMeta = currentDcId.equalsIgnoreCase(dcId) ? allFutureDcMeta : null;
return;
}

Expand All @@ -88,12 +88,11 @@ public void compare(DcMeta future) {
}

if (currentDcId.equalsIgnoreCase(dcId)) {
DcMeta futureDcAllMeta = getCurrentDcMeta(dcId);
KeeperContainerMetaComparator keeperContainerMetaComparator
= new KeeperContainerMetaComparator(current, future, currentDcAllMeta, futureDcAllMeta);
= new KeeperContainerMetaComparator(current, future, currentDcAllMeta, allFutureDcMeta);
keeperContainerMetaComparator.compare();
keeperContainerMetaComparator.accept(new KeeperContainerMetaComparatorVisitor());
currentDcAllMeta = futureDcAllMeta;
currentDcAllMeta = allFutureDcMeta;
}

comparator.accept(this);
Expand All @@ -103,16 +102,6 @@ public void compare(DcMeta future) {
this.current = future;
}

private DcMeta getCurrentDcMeta(String dcId) {
try {
return checkerConsoleService.getXpipeAllDCMeta(checkerConfig.getConsoleAddress(), dcId)
.getDcs().get(dcId);
} catch (Throwable th) {
logger.error("[getCurrentDcMeta] get dcMeta from dc {} fail", dcId, th);
}
return null;
}

private void removeAndAdd() {
this.redisListToDelete.forEach(this::removeRedis);
this.clustersToDelete.forEach(this::removeCluster);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.impl.HealthCheckEndpointFactory;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
import com.ctrip.xpipe.redis.core.meta.CurrentDcAllMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.spring.AbstractSpringConfigContext;
import com.ctrip.xpipe.utils.MapUtils;
Expand Down Expand Up @@ -49,6 +50,9 @@ public class DefaultMetaChangeManager implements MetaChangeManager {
@Autowired
private MetaCache metaCache;

@Resource
private CurrentDcAllMeta currentDcAllMeta;

@Autowired
private CheckerConsoleService checkerConsoleService;

Expand Down Expand Up @@ -83,7 +87,7 @@ private void checkDcMetaChange() {
ignore(dcId);
continue;
}
getOrCreate(dcId).compare(entry.getValue());
getOrCreate(dcId).compare(entry.getValue(), currentDcAllMeta.getCurrentDcAllMeta());
}
}

Expand Down Expand Up @@ -120,7 +124,7 @@ public void startIfPossible(String dcId) {
}
try {
DcMetaChangeManager manager = getOrCreate(dcId);
manager.compare(metaCache.getXpipeMeta().findDc(dcId));
manager.compare(metaCache.getXpipeMeta().findDc(dcId), currentDcAllMeta.getCurrentDcAllMeta());
manager.start();
} catch (Exception e) {
logger.error("[start]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.checker.healthcheck.impl.HealthCheckEndpointFactory;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.redis.core.meta.CurrentDcAllMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.slf4j.Logger;
Expand Down Expand Up @@ -43,6 +44,9 @@ public abstract class AbstractInstanceSessionManager implements InstanceSessionM
@Autowired
protected MetaCache metaCache;

@Resource
protected CurrentDcAllMeta currentDcAllMeta;

@Autowired
protected CheckerConsoleService checkerConsoleService;

Expand All @@ -69,6 +73,7 @@ public abstract class AbstractInstanceSessionManager implements InstanceSessionM
@VisibleForTesting
public static long checkUnusedRedisDelaySeconds = 4;


@PostConstruct
public void postConstruct(){
scheduled.scheduleAtFixedRate(new AbstractExceptionLogTask() {
Expand Down Expand Up @@ -173,16 +178,6 @@ protected Set<HostPort> getSessionsForKeeper(DcMeta dcMeta, DcMeta allDcMeta) {

protected abstract HostPort getMonitorInstance(List<RedisMeta> redises, KeeperMeta keeper);

protected DcMeta getCurrentDcAllMeta(String dcId) {
try {
return checkerConsoleService.getXpipeAllDCMeta(checkerConfig.getConsoleAddress(), dcId)
.getDcs().get(dcId);
} catch (Throwable th) {
logger.error("[getCurrentDcAllMeta] get all dc meta of dc {} fail!", dcId, th);
}
return null;
}

private void closeAllConnections() {
try {
executors.execute(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class DefaultKeeperSessionManager extends AbstractInstanceSessionManager
protected Set<HostPort> getInUseInstances() {
DcMeta currentDcMeta = metaCache.getXpipeMeta().getDcs().get(currentDcId);
if (currentDcMeta != null)
return getSessionsForKeeper(currentDcMeta, getCurrentDcAllMeta(currentDcId));
return getSessionsForKeeper(currentDcMeta, currentDcAllMeta.getCurrentDcAllMeta());

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
* Dec 1, 2016 6:42:01 PM
*/

@Component
public class DefaultRedisSessionManager extends AbstractInstanceSessionManager implements RedisSessionManager {

Expand All @@ -28,8 +29,7 @@ protected Set<HostPort> getInUseInstances() {
if(dcMeta == null) break;

if (dcMeta.getId().equalsIgnoreCase(currentDcId)) {
DcMeta currentDcAllMeta = getCurrentDcAllMeta(currentDcId);
redisInUse.addAll(getSessionsForKeeper(dcMeta, currentDcAllMeta));
redisInUse.addAll(getSessionsForKeeper(dcMeta, currentDcAllMeta.getCurrentDcAllMeta()));
}

for (ClusterMeta clusterMeta : dcMeta.getClusters().values()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.ctrip.xpipe.redis.checker.impl;

import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.meta.CurrentDcAllMeta;

/**
* @author yu
* <p>
* 2023/11/14
*/
public class TestCurrentDcAllMetaCache implements CurrentDcAllMeta {
@Override
public DcMeta getCurrentDcAllMeta() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public XpipeMeta getXpipeAllMeta(String console) throws SAXException, IOExcepti
return DefaultSaxParser.parse(raw);
}

public XpipeMeta getXpipeAllDCMeta(String console, String dcName) throws SAXException, IOException {
UriComponents comp = UriComponentsBuilder.fromHttpUrl(console + ConsoleCheckerPath.PATH_GET_ALL_DC_META)
public XpipeMeta getXpipeDcAllMeta(String console, String dcName) throws SAXException, IOException {
UriComponents comp = UriComponentsBuilder.fromHttpUrl(console + ConsoleCheckerPath.PATH_GET_DC_ALL_META)
.queryParam("format", "xml").buildAndExpand(dcName);

String raw = restTemplate.getForObject(comp.toString(), String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
import com.ctrip.xpipe.redis.checker.healthcheck.HealthChecker;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.DefaultPingService;
import com.ctrip.xpipe.redis.checker.healthcheck.actions.ping.PingService;
import com.ctrip.xpipe.redis.checker.impl.CheckerClusterHealthManager;
import com.ctrip.xpipe.redis.checker.impl.CheckerRedisInfoManager;
import com.ctrip.xpipe.redis.checker.impl.DefaultRemoteCheckerManager;
import com.ctrip.xpipe.redis.checker.impl.TestMetaCache;
import com.ctrip.xpipe.redis.checker.impl.*;
import com.ctrip.xpipe.redis.core.meta.CurrentDcAllMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.redis.core.spring.AbstractRedisConfigContext;
import com.ctrip.xpipe.utils.OsUtils;
Expand Down Expand Up @@ -72,6 +70,12 @@ public MetaCache metaCache() {
return new TestMetaCache();
}

@Bean
public CurrentDcAllMeta testCurrentDcAllMeta() {
return new TestCurrentDcAllMetaCache();
}


@Bean
public TestPersistenceCache persistence() {
return new TestPersistenceCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public int getRedisReplicationHealthCheckInterval() {
return 2000;
}

@Override
public int getCheckerCurrentDcAllMetaRefreshIntervalMilli() {
return 300000;
}

@Override
public int getClusterHealthCheckInterval() {
return 300000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.ctrip.xpipe.redis.checker.healthcheck.HealthCheckInstanceManager;
import com.ctrip.xpipe.redis.checker.healthcheck.meta.MetaChangeManager;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.redis.core.meta.CurrentDcAllMeta;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.google.common.collect.Sets;
import org.junit.Assert;
Expand Down Expand Up @@ -47,11 +48,12 @@ public class DefaultHealthCheckerMockTest extends AbstractCheckerTest {
@Mock
private CheckerConfig checkerConfig;

@Mock
private CurrentDcAllMeta currentDcAllMeta;

@Before
public void setupDefaultHealthCheckerMockTest() throws IOException, SAXException {
when(checkerConfig.getIgnoredHealthCheckDc()).thenReturn(Collections.emptySet());
when(checkerConfig.getConsoleAddress()).thenReturn("127.0.0.1");
when(checkerConsoleService.getXpipeAllDCMeta(Mockito.anyString(), Mockito.anyString())).thenReturn(getXpipeMeta());
when(metaCache.getXpipeMeta()).thenReturn(getXpipeMeta());
}

Expand Down Expand Up @@ -105,7 +107,6 @@ public void generateSingleDcHealthCheckInstancesTest() throws Exception {
jqDcMeta.addCluster(clusterMeta);
xpipeMeta.addDc(jqDcMeta);
when(metaCache.getXpipeMeta()).thenReturn(xpipeMeta);
when(checkerConsoleService.getXpipeAllDCMeta(Mockito.anyString(), Mockito.anyString())).thenReturn(xpipeMeta);

checker.generateHealthCheckInstances();
verify(instanceManager, times(1)).getOrCreate(new ClusterMeta().setId("single_dc_cluster"));
Expand Down
Loading

0 comments on commit 2efe621

Please sign in to comment.