Skip to content

Commit

Permalink
remove redundant peerMaster after unbinding peerDc
Browse files Browse the repository at this point in the history
  • Loading branch information
lishanglin committed Dec 22, 2023
1 parent b43b217 commit 6a176e6
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package com.ctrip.xpipe.redis.meta.server.crdt.master;

import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommand;

import java.util.Set;

public interface MasterChooseCommandFactory {

RedundantMasterClearCommand buildRedundantMasterClearCommand(Long clusterDbId, Long shardDbId, Set<String> dcs);

MasterChooseCommand buildPeerMasterChooserCommand(String dcId, Long clusterDbId, Long shardDbId);

MasterChooseCommand buildCurrentMasterChooserCommand(Long clusterDbId, Long shardDbId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.ctrip.xpipe.redis.meta.server.crdt.master.command;

import com.ctrip.xpipe.command.AbstractCommand;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;

/**
* @author lishanglin
* date 2023/12/22
*/
public class RedundantMasterClearCommand extends AbstractCommand<Set<String>> {

private Long clusterId;

private Long shardId;

private Set<String> dcs;

private CurrentMetaManager currentMetaManager;

private static Logger logger = LoggerFactory.getLogger(RedundantMasterClearCommand.class);

public RedundantMasterClearCommand(Long clusterId, Long shardId, Set<String> dcs, CurrentMetaManager currentMetaManager) {
this.clusterId = clusterId;
this.shardId = shardId;
this.dcs = dcs;
this.currentMetaManager = currentMetaManager;
}

@Override
protected void doExecute() throws Throwable {
Set<String> currentDcs = currentMetaManager.getUpstreamPeerDcs(clusterId, shardId);
Set<String> redundantDcs = new HashSet<>(currentDcs);
redundantDcs.removeAll(dcs);

for (String dc: redundantDcs) {
logger.info("[cluster_{},shard_{}] remove dc {}", clusterId, shardId, dc);
currentMetaManager.removePeerMaster(dc, clusterId, shardId);
}

future().setSuccess(redundantDcs);
}

@Override
protected void doReset() {
// do nothing
}

Check warning on line 51 in redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/crdt/master/command/RedundantMasterClearCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/crdt/master/command/RedundantMasterClearCommand.java#L51

Added line #L51 was not covered by tests

@Override
public String getName() {
return String.format("%s[cluster_%d,shard_%d]", getClass().getSimpleName(), clusterId, shardId);

Check warning on line 55 in redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/crdt/master/command/RedundantMasterClearCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/crdt/master/command/RedundantMasterClearCommand.java#L55

Added line #L55 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.ctrip.xpipe.redis.meta.server.crdt.master.MasterChooseCommandFactory;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.CurrentMasterChooseCommand;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.PeerMasterChooseCommand;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommand;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.redis.meta.server.multidc.MultiDcService;
Expand All @@ -17,6 +18,7 @@
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -52,6 +54,11 @@ public DefaultMasterChooseCommandFactory(DcMetaCache dcMetaCache, CurrentMetaMan
scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("PeerMasterChooseCommandSchedule"));
}

@Override
public RedundantMasterClearCommand buildRedundantMasterClearCommand(Long clusterDbId, Long shardDbId, Set<String> dcs) {
return new RedundantMasterClearCommand(clusterDbId, shardDbId, dcs, currentMetaManager);

Check warning on line 59 in redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/crdt/master/impl/DefaultMasterChooseCommandFactory.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-meta/src/main/java/com/ctrip/xpipe/redis/meta/server/crdt/master/impl/DefaultMasterChooseCommandFactory.java#L59

Added line #L59 was not covered by tests
}

@Override
public MasterChooseCommand buildPeerMasterChooserCommand(String dcId, Long clusterDbId, Long shardDbId) {
MasterChooseCommand masterChooseCommand = new PeerMasterChooseCommand(dcId, clusterDbId, shardDbId, multiDcService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
import com.ctrip.xpipe.concurrent.KeyedOneThreadTaskExecutor;
import com.ctrip.xpipe.redis.core.entity.ClusterMeta;
import com.ctrip.xpipe.redis.meta.server.crdt.master.MasterChooseCommandFactory;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommand;
import com.ctrip.xpipe.redis.meta.server.keeper.keepermaster.MasterChooser;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.tuple.Pair;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;

public class PeerMasterChooser extends CurrentMasterChooser implements MasterChooser {
Expand Down Expand Up @@ -37,11 +42,14 @@ protected void work() {
ParallelCommandChain parallelCommandChain = new ParallelCommandChain(executors);
String currentDc = dcMetaCache.getCurrentDc();

for (String dcId : clusterMeta.getDcs().split("\\s*,\\s*")) {
String[] rawDcs = clusterMeta.getDcs().split("\\s*,\\s*");
Set<String> dcs = new HashSet<>(Arrays.asList(rawDcs));
masterChooseCommandFactory.buildRedundantMasterClearCommand(clusterDbId, shardDbId, dcs).execute();

for (String dcId : dcs) {
if (currentDc.equalsIgnoreCase(dcId)) continue;
parallelCommandChain.add(masterChooseCommandFactory.buildPeerMasterChooserCommand(dcId, clusterDbId, shardDbId));
}

peerMasterChooseExecutor.execute(Pair.of(clusterDbId, shardDbId), parallelCommandChain);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.ctrip.xpipe.redis.meta.server.crdt.PeerMasterMetaServerStateChangeHandlerTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.CurrentMasterChooseCommandTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.PeerMasterChooseCommandTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommandTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.impl.*;
import com.ctrip.xpipe.redis.meta.server.crdt.replication.impl.DefaultPeerMasterStateAdjusterTest;
import com.ctrip.xpipe.redis.meta.server.crdt.replication.impl.DefaultPeerMasterStateManagerTest;
Expand Down Expand Up @@ -105,6 +106,7 @@
MasterChooserTest.class,
CurrentMasterChooseCommandTest.class,
PeerMasterChooseCommandTest.class,
RedundantMasterClearCommandTest.class,
DefaultPeerMasterChooseActionTest.class,
MasterChooseCommandFactoryTest.class,
PeerMasterAdjustJobTest.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.ctrip.xpipe.redis.meta.server.crdt.master.command;

import com.ctrip.xpipe.redis.meta.server.AbstractMetaServerTest;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.stream.Collectors;

import static org.mockito.Mockito.*;

/**
* @author lishanglin
* date 2023/12/22
*/
@RunWith(MockitoJUnitRunner.class)
public class RedundantMasterClearCommandTest extends AbstractMetaServerTest {

@Mock
private CurrentMetaManager currentMetaManager;

@Before
public void setupRedundantMasterClearCommandTest() {
when(currentMetaManager.getUpstreamPeerDcs(getClusterDbId(), getShardDbId()))
.thenReturn(new HashSet<>(Arrays.asList("jq", "oy")));
}

@Test
public void testClearRedundant() throws Exception {
RedundantMasterClearCommand command = new RedundantMasterClearCommand(getClusterDbId(), getShardDbId(),
new HashSet<>(Arrays.asList("jq", "rb")), currentMetaManager);
Assert.assertEquals(Collections.singleton("oy"), command.execute().get());
verify(currentMetaManager, times(1)).removePeerMaster("oy", getClusterDbId(), getShardDbId());
}

@Test
public void testNoRedundant() throws Exception {
RedundantMasterClearCommand command = new RedundantMasterClearCommand(getClusterDbId(), getShardDbId(),
new HashSet<>(Arrays.asList("jq", "oy")), currentMetaManager);
Assert.assertEquals(Collections.emptySet(), command.execute().get());
verify(currentMetaManager, never()).removePeerMaster(anyString(), any(), any());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.ctrip.xpipe.redis.meta.server.AbstractMetaServerTest;
import com.ctrip.xpipe.redis.meta.server.crdt.master.MasterChooseCommand;
import com.ctrip.xpipe.redis.meta.server.crdt.master.MasterChooseCommandFactory;
import com.ctrip.xpipe.redis.meta.server.crdt.master.command.RedundantMasterClearCommand;
import com.ctrip.xpipe.redis.meta.server.meta.CurrentMetaManager;
import com.ctrip.xpipe.redis.meta.server.meta.DcMetaCache;
import com.ctrip.xpipe.tuple.Pair;
Expand Down Expand Up @@ -43,6 +44,9 @@ public class MasterChooserTest extends AbstractMetaServerTest {
@Mock
private MasterChooseCommand command;

@Mock
private RedundantMasterClearCommand redundantMasterClearCommand;

@Mock
private KeyedOneThreadTaskExecutor<Pair<Long, Long> > keyedOneThreadTaskExecutor;

Expand All @@ -64,6 +68,7 @@ public void setupDefaultPeerMasterChooserTest() throws Exception {
@Test
public void testPeerMasterChooseWork() {
Mockito.when(factory.buildPeerMasterChooserCommand(Mockito.anyString(), Mockito.anyLong(), Mockito.anyLong())).thenReturn(command);
Mockito.when(factory.buildRedundantMasterClearCommand(Mockito.anyLong(), Mockito.anyLong(), Mockito.anySet())).thenReturn(redundantMasterClearCommand);
Mockito.doAnswer(invocation -> {
Pair<String, String> key = invocation.getArgument(0, Pair.class);
ParallelCommandChain commandChain = invocation.getArgument(1, ParallelCommandChain.class);
Expand Down

0 comments on commit 6a176e6

Please sign in to comment.