Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bugs for console and keeper #755

Merged
merged 5 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
@Autowired
private ZkClient zkClient;

private ExecutorService executors = Executors.newCachedThreadPool(XpipeThreadFactory.create(getClass().getSimpleName()));
// single thread make zk events handled sequentially
private ExecutorService executors = Executors.newSingleThreadExecutor(XpipeThreadFactory.create(getClass().getSimpleName()));

Check warning on line 37 in core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java#L37

Added line #L37 was not covered by tests

private ApplicationContext applicationContext;

Expand Down Expand Up @@ -63,8 +64,8 @@
try{
logger.info("[isLeader][notify]{}", entry.getKey());
entry.getValue().isleader();
}catch (Exception e){
logger.error("[isLeader]" + entry, e);
}catch (Throwable th){
logger.error("[isLeader]" + entry, th);

Check warning on line 68 in core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java#L67-L68

Added lines #L67 - L68 were not covered by tests
}
}
}
Expand All @@ -79,8 +80,8 @@
try{
logger.info("[notLeader][notify]{}", entry.getKey());
entry.getValue().notLeader();
}catch (Exception e){
logger.error("[notLeader]" + entry, e);
}catch (Throwable th){
logger.error("[notLeader]" + entry, th);

Check warning on line 84 in core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cluster/AbstractLeaderElector.java#L83-L84

Added lines #L83 - L84 were not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.springframework.web.client.RestOperations;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -114,16 +115,16 @@

@Override
protected void doStart() throws Exception {
scheduled.scheduleWithFixedDelay(() -> {
future = scheduled.scheduleWithFixedDelay(() -> {

Check warning on line 118 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L118

Added line #L118 was not covered by tests
if (!taskTrigger.get()) {
return;
}
logger.debug("proxy chain collector started");
getAllDcProxyChains();
fetchAllDcProxyChains();

Check warning on line 123 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L123

Added line #L123 was not covered by tests
}, getStartTime(), getPeriodic(), TimeUnit.MILLISECONDS);
}

private void getAllDcProxyChains() {
protected void fetchAllDcProxyChains() {
ParallelCommandChain commandChain = new ParallelCommandChain(MoreExecutors.directExecutor(), false);
consoleConfig.getConsoleDomains().forEach((dc, domain)->{
logger.debug("begin to get proxy chain from dc {} {}", dc, domain);
Expand Down Expand Up @@ -152,18 +153,21 @@
dcProxyChainMap.forEach((dc, proxyChainMap) -> {
proxyChainMap.forEach((clusterShard, proxyChain) -> {
DefaultTunnelInfo tunnel = proxyChain.getTunnelInfos().get(0);
if (tempShardProxyChain.containsKey(clusterShard)) {
tempShardProxyChain.get(clusterShard).getTunnelInfos().add(tunnel);
} else {
tempShardProxyChain.put(clusterShard, proxyChain);
if (!tempShardProxyChain.containsKey(clusterShard)) {
tempShardProxyChain.put(clusterShard, new DefaultProxyChain(proxyChain.getBackupDcId(),
proxyChain.getClusterId(), proxyChain.getShardId(), proxyChain.getPeerDcId(), new ArrayList<>()));

Check warning on line 158 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L157-L158

Added lines #L157 - L158 were not covered by tests
}

tempShardProxyChain.get(clusterShard).getTunnelInfos().add(tunnel);

Check warning on line 160 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L160

Added line #L160 was not covered by tests
tempTunnelClusterShardMap.put(tunnel.getTunnelId(), clusterShard);
});
});
synchronized (DefaultProxyChainCollector.this) {
tunnelClusterShardMap = tempTunnelClusterShardMap;
shardProxyChainMap = tempShardProxyChain;
if (taskTrigger.get()) {
tunnelClusterShardMap = tempTunnelClusterShardMap;
shardProxyChainMap = tempShardProxyChain;

Check warning on line 167 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L166-L167

Added lines #L166 - L167 were not covered by tests
} else {
clear();

Check warning on line 169 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L169

Added line #L169 was not covered by tests
}
}
}

Expand All @@ -173,6 +177,12 @@
return this;
}

@VisibleForTesting
DefaultProxyChainCollector setHttpService(DefaultHttpService httpService) {
this.httpService = httpService;
return this;

Check warning on line 183 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L182-L183

Added lines #L182 - L183 were not covered by tests
}

@Override
public Map<DcClusterShardPeer, ProxyChain> getShardProxyChainMap() {
return shardProxyChainMap;
Expand All @@ -186,15 +196,19 @@
return 1000;
}

protected void clear() {
shardProxyChainMap.clear();
tunnelClusterShardMap.clear();
dcProxyChainMap.clear();
}

Check warning on line 203 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L200-L203

Added lines #L200 - L203 were not covered by tests

@Override
protected void doStop() throws Exception {
if(future != null) {
future.cancel(true);
future = null;
}
shardProxyChainMap.clear();
tunnelClusterShardMap.clear();
dcProxyChainMap.clear();
clear();

Check warning on line 211 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java#L211

Added line #L211 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void notLeader() {
}

private synchronized void stopLoadMeta(){
logger.info("[loadMeta][stop]{}", this);
if (future != null)
future.cancel(true);
future = null;
Expand All @@ -98,7 +99,7 @@ private synchronized void stopLoadMeta(){
}

public void startLoadMeta() {
logger.info("[loadMeta]{}", this);
logger.info("[loadMeta][start]{}", this);

refreshIntervalMilli = consoleConfig.getCacheRefreshInterval();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.command.ParallelCommandChain;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.healthcheck.BiDirectionSupport;
import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport;
Expand All @@ -25,11 +27,15 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR;

/**
* @author chen.zhu
* <p>
Expand Down Expand Up @@ -58,6 +64,9 @@
@Autowired
private FoundationService foundationService;

@Resource(name = GLOBAL_EXECUTOR)
protected ExecutorService executors;

@Override
public void updateRedisDelays(Map<HostPort, Long> redisDelays) {
hostPort2Delay.putAll(redisDelays);
Expand Down Expand Up @@ -221,12 +230,24 @@
}

UnhealthyInfoModel infoAggregation = new UnhealthyInfoModel();
ParallelCommandChain commandChain = new ParallelCommandChain(executors);
for (String dcId : xpipeMeta.getDcs().keySet()) {
UnhealthyInfoModel unhealthyInfo = getDcActiveClusterUnhealthyInstance(dcId);
if (null == unhealthyInfo) infoAggregation.getAttachFailDc().add(dcId);
else infoAggregation.merge(unhealthyInfo);
FetchDcUnhealthyInstanceCmd cmd = new FetchDcUnhealthyInstanceCmd(dcId);
commandChain.add(cmd);
cmd.future().addListener(commandFuture -> {
if (commandFuture.isSuccess() && null != commandFuture.get()) {
infoAggregation.merge(commandFuture.get());
} else {
infoAggregation.getAttachFailDc().add(dcId);

Check warning on line 241 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java#L241

Added line #L241 was not covered by tests
}
});
}

try {
commandChain.execute().get();
} catch (Throwable th) {
logger.info("[getAllUnhealthyInstance][fail] {}", th.getMessage());

Check warning on line 249 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java#L248-L249

Added lines #L248 - L249 were not covered by tests
}
return infoAggregation;
}

Expand All @@ -239,4 +260,29 @@
public void setFoundationService(FoundationService foundationService) {
this.foundationService = foundationService;
}

class FetchDcUnhealthyInstanceCmd extends AbstractCommand<UnhealthyInfoModel> {

private String dc;

public FetchDcUnhealthyInstanceCmd(String dc) {
this.dc = dc;
}

@Override
protected void doExecute() throws Throwable {
future().setSuccess(getDcActiveClusterUnhealthyInstance(dc));
}

@Override
protected void doReset() {
// do nothing
}

Check warning on line 280 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java#L280

Added line #L280 was not covered by tests

@Override
public String getName() {
return getClass().getSimpleName();

Check warning on line 284 in redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/impl/DefaultDelayService.java#L284

Added line #L284 was not covered by tests
}
}

}
Original file line number Diff line number Diff line change
@@ -1,18 +1,116 @@
package com.ctrip.xpipe.redis.console.proxy.impl;

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.redis.checker.model.DcClusterShardPeer;
import com.ctrip.xpipe.redis.console.AbstractConsoleTest;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.model.ProxyModel;
import com.ctrip.xpipe.redis.console.proxy.ProxyChain;
import com.ctrip.xpipe.redis.console.proxy.ProxyChainAnalyzer;
import com.ctrip.xpipe.redis.console.proxy.ProxyChainCollector;
import com.ctrip.xpipe.redis.console.reporter.DefaultHttpService;
import org.apache.http.HttpException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestOperations;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;

import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;


@RunWith(MockitoJUnitRunner.class)
public class DefaultProxyChainCollectorTest extends AbstractConsoleTest {

@Autowired
ProxyChainCollector collector;
@Mock
private ProxyChainAnalyzer proxyChainAnalyzer;

@Mock
private ConsoleConfig consoleConfig;

@Mock
private DefaultHttpService httpService;

@Mock
private RestOperations restTemplate;

@InjectMocks
DefaultProxyChainCollector collector;

private Map<String,String> consoles = new HashMap<String, String>() {{
put("dc1","http://dc1");
put("dc2","http://dc2");
}};

@Before
public void setupDefaultProxyChainCollectorTest() {
when(httpService.getRestTemplate()).thenReturn(restTemplate);
this.collector.setHttpService(httpService);
when(consoleConfig.getConsoleDomains()).thenReturn(consoles);
}

private Map<DcClusterShardPeer, DefaultProxyChain> generateProxyChains(int cnt) {
Map<DcClusterShardPeer, DefaultProxyChain> result = new HashMap<>();
IntStream.range(0, cnt).forEach(i -> {
DcClusterShardPeer dcClusterShardPeer = new DcClusterShardPeer("dc1", "cluster" + i, "shard" + i, "dc2");
List<DefaultTunnelInfo> tunnels = new ArrayList<>();
tunnels.add(new DefaultTunnelInfo(new ProxyModel(), "tunnel" + i));
DefaultProxyChain proxyChain = new DefaultProxyChain("dc1", "cluster" + i, "shard" + i, "dc2", tunnels);
result.put(dcClusterShardPeer, proxyChain);
});

return result;
}

@Test
public void testUpdateProxyChains() {
// collector.updateShardProxyChainMap();
public void remoteDcDown_noMemLeak() {
ResponseEntity<Map<DcClusterShardPeer, DefaultProxyChain>> resp = new ResponseEntity(generateProxyChains(100), HttpStatus.OK);
when(restTemplate.exchange(anyString(), any(), any(), any(ParameterizedTypeReference.class), anyString()))
.thenReturn(resp);
IntStream.range(0, 10).forEach(i -> collector.fetchAllDcProxyChains());

Map<String, Map<DcClusterShardPeer, ProxyChain>> dcProxyChainMap = collector.getDcProxyChainMap();
for (Map<DcClusterShardPeer, ProxyChain> proxyChainMap: dcProxyChainMap.values()) {
for (ProxyChain proxyChain: proxyChainMap.values()) {
Assert.assertEquals(2, proxyChain.getTunnelInfos().size());
}
}

doAnswer(inov -> {
String uri = inov.getArgument(0);
String host = consoles.values().iterator().next();
if (uri.startsWith(host)) throw new HttpException("mock");
else return resp;
}).when(restTemplate).exchange(anyString(), any(), any(), any(ParameterizedTypeReference.class));

IntStream.range(0, 10).forEach(i -> collector.fetchAllDcProxyChains());
dcProxyChainMap = collector.getDcProxyChainMap();
for (Map<DcClusterShardPeer, ProxyChain> proxyChainMap: dcProxyChainMap.values()) {
for (ProxyChain proxyChain: proxyChainMap.values()) {
Assert.assertEquals(2, proxyChain.getTunnelInfos().size());
}
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@

protected void failPsync(Throwable throwable) {
super.failPsync(throwable);
if (psyncState == PSYNC_STATE.PSYNC_COMMAND_WAITING_REPONSE) {
try {
getLogger().debug("[failPsync] psync fail before beginReadRdb");
currentReplicationStore.close();
currentReplicationStore.destroy();
} catch (Throwable th) {
getLogger().warn("[failPsync] release rdb file fail", th);
}
try {
getLogger().info("[failPsync][release rdb]");
currentReplicationStore.close();
currentReplicationStore.destroy();
} catch (Throwable th) {
getLogger().warn("[failPsync][release rdb] fail", th);

Check warning on line 38 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/RdbOnlyPsync.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/RdbOnlyPsync.java#L34-L38

Added lines #L34 - L38 were not covered by tests
}
}

Expand Down
Loading
Loading