Skip to content

Commit

Permalink
add keeper reset election API
Browse files Browse the repository at this point in the history
  • Loading branch information
lishanglin committed Nov 11, 2023
1 parent a954f37 commit 3a61a48
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,7 @@ public static enum PROMOTION_STATE{
void resetDefaultReplication();

PsyncObserver createPsyncObserverForRdbOnlyRepl();

void resetElection();

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public KeeperDiskInfo infoDisk() {
return keeperContainerService.infoDisk();
}

@PostMapping("/election/reset" )
public void resetElection(@RequestBody KeeperTransMeta keeperTransMeta) {
logger.info("[resetElection]{}", keeperTransMeta);
keeperContainerService.resetElection(ReplId.from(keeperTransMeta.getReplId()));
}

@RequestMapping(value = "/clusters/" + CLUSTER_NAME_PATH_VARIABLE + "/shards/" + SHARD_NAME_PATH_VARIABLE, method = RequestMethod.DELETE)
public void remove(@PathVariable String clusterName, @PathVariable String shardName, @RequestBody KeeperTransMeta keeperTransMeta) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ public KeeperDiskInfo infoDisk() {
return diskHealthChecker.getResult();
}

public void resetElection(ReplId replId) {
String keeperServerKey = replId.toString();

RedisKeeperServer keeperServer = redisKeeperServers.get(keeperServerKey);

if (keeperServer == null) {
throw new RedisKeeperRuntimeException(
new ErrorMessage<>(KeeperContainerErrorCode.KEEPER_NOT_EXIST,
String.format("Reset election for %s failed since keeper doesn't exist", replId)), null);
}

if (!keeperServer.getLifecycleState().isStarted()) {
throw new RedisKeeperRuntimeException(
new ErrorMessage<>(KeeperContainerErrorCode.KEEPER_ALREADY_STARTED,
String.format("Keeper for %s has not started", replId)), null);
}

keeperServer.resetElection();
}

public void start(ReplId replId) {
String keeperServerKey = replId.toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,16 @@ private LeaderElector createLeaderElector(){
return leaderElectorManager.createLeaderElector(ctx);
}

@Override
public void resetElection() {
try {
LifecycleHelper.stopIfPossible(leaderElector);
LifecycleHelper.startIfPossible(leaderElector);
} catch (Exception e) {
logger.info("[resetElection][fail][{}]", replId, e);
}
}

@Override
protected void doInitialize() throws Exception {
super.doInitialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion;
import com.ctrip.xpipe.redis.core.protocal.protocal.EofType;
import com.ctrip.xpipe.redis.core.protocal.protocal.LenEofType;
import com.ctrip.xpipe.redis.core.redis.operation.RedisOp;
import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType;
import com.ctrip.xpipe.redis.core.redis.operation.RedisSingleKeyOp;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpSingleKey;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisSingleKeyOpGtidWrapper;
import com.ctrip.xpipe.redis.core.store.OffsetReplicationProgress;
import com.ctrip.xpipe.redis.core.store.ReplicationStore;
import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperTest;
Expand Down Expand Up @@ -252,40 +247,6 @@ public void testMultiBeginWritingCmds() throws Exception {
verify(replicationStore).addCommandsListener(any(), any());
}

@Test
public void testFilterPublish() {
RedisSingleKeyOp op1 = new RedisOpSingleKey(RedisOpType.SET, string2Bytes("set a 1"), null, null);
RedisOp gtidOp1 = new RedisSingleKeyOpGtidWrapper(string2Bytes("GTID ggg:1 0"), "ggg", op1);
Assert.assertFalse(redisSlave.shouldFilter(gtidOp1));

RedisSingleKeyOp op2 = new RedisOpSingleKey(RedisOpType.PUBLISH, string2Bytes("publish xpipe-hetero-ppp 222"), null, null);
RedisOp gtidOp2 = new RedisSingleKeyOpGtidWrapper(string2Bytes("GTID ggg:1 0"), "ggg", op2);
Assert.assertFalse(redisSlave.shouldFilter(gtidOp2));

RedisSingleKeyOp op3 = new RedisOpSingleKey(RedisOpType.PUBLISH, string2Bytes("publish ppp 222"), null, null);
RedisOp gtidOp3 = new RedisSingleKeyOpGtidWrapper(string2Bytes("GTID ggg:1 0"), "ggg", op3);
Assert.assertTrue(redisSlave.shouldFilter(gtidOp3));
Assert.assertTrue(redisSlave.shouldFilter(op3));

//test estimated size by the way
Assert.assertEquals(15, gtidOp1.estimatedSize());
Assert.assertEquals(36, gtidOp2.estimatedSize());
Assert.assertEquals(23, gtidOp3.estimatedSize());
}

private byte[][] string2Bytes(String s) {

String[] ss = s.split(" ");
int length = ss.length;
byte[][] b = new byte[length][];

for (int i = 0; i < length; i++) {
b[i] = ss[i].getBytes();
}

return b;
}

@Test
public void testAckPutOnline() {
DefaultRedisSlave rs = Mockito.spy(redisSlave);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.ctrip.xpipe.redis.core.redis.operation.RedisSingleKeyOp;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpSingleKey;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisSingleKeyOpGtidWrapper;
import com.ctrip.xpipe.redis.core.store.ReplicationStore;
import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperTest;
import com.ctrip.xpipe.redis.keeper.KeeperRepl;
import com.ctrip.xpipe.redis.keeper.RedisClient;
Expand All @@ -25,7 +24,7 @@
* @author lishanglin
* date 2023/11/10
*/
@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class XsyncRedisSlaveTest extends AbstractRedisKeeperTest {

@Mock
Expand Down

0 comments on commit 3a61a48

Please sign in to comment.