Skip to content

Commit

Permalink
Merge pull request #877 from ctripcorp/bugfix/rordb
Browse files Browse the repository at this point in the history
fix some keeper bugs
  • Loading branch information
LanternLee authored Sep 10, 2024
2 parents 92bec94 + f060eac commit 8489c40
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public ReferenceFileRegion(FileChannel fileChannel, long position, long count, R


@Override
protected void deallocate() {
public void deallocate() {

try {
referenceFileChannel.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static enum PROMOTION_STATE{

KeeperConfig getKeeperConfig();

void clearRdbDumper(RdbDumper rdbDumper);
void clearRdbDumper(RdbDumper rdbDumper, boolean forceRdb);

void setRdbDumper(RdbDumper rdbDumper) throws SetRdbDumperException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public String getName() {
public void setRdbDumpState(RdbDumpState rdbDumpState) {
lock.writeLock().lock();
try {
RdbDumpState preState = this.rdbDumpState;
this.rdbDumpState = rdbDumpState;
switch (rdbDumpState) {
case DUMPING:
Expand All @@ -50,12 +51,13 @@ public void setRdbDumpState(RdbDumpState rdbDumpState) {
doWhenAuxParsed();
break;
case FAIL:
boolean failOnDumping = preState.equals(DUMPING) || preState.equals(AUX_PARSED);
doWhenDumpFailed();
redisKeeperServer.clearRdbDumper(this);
redisKeeperServer.clearRdbDumper(this, failOnDumping);
break;
case NORMAL:
// clear dumper
redisKeeperServer.clearRdbDumper(this);
redisKeeperServer.clearRdbDumper(this, false);
break;
case WAIT_DUMPPING:
break;
Expand Down Expand Up @@ -138,7 +140,7 @@ public void tryFullSync(RedisSlave redisSlave) throws IOException {
case FAIL:
case NORMAL:
getLogger().warn("[tryFullSync]{}", redisSlave);
redisKeeperServer.clearRdbDumper(this);
redisKeeperServer.clearRdbDumper(this, false);
redisKeeperServer.fullSyncToSlave(redisSlave);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,8 +796,12 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {
}

try{
dumpNewRdb(tryRordb);
RdbDumper newDumper = dumpNewRdb(tryRordb);
redisSlave.waitForRdbDumping();
if (newDumper.future().isDone() && !newDumper.future().isSuccess()) {
logger.info("[fullSyncToSlave][new dumper fail immediatelly]");
redisSlave.close();
}
}catch(AbstractRdbDumperException e){
logger.error("[fullSyncToSlave]", e);
if(e.isCancelSlave()){
Expand Down Expand Up @@ -956,7 +960,7 @@ public void operationComplete(CommandFuture<Void> commandFuture) throws Exceptio
}

@Override
public void clearRdbDumper(RdbDumper oldDumper) {
public void clearRdbDumper(RdbDumper oldDumper, boolean forceRdb) {

logger.info("[clearRdbDumper]{}", oldDumper);
if(!rdbDumper.compareAndSet(oldDumper, null)){
Expand All @@ -974,8 +978,13 @@ public void clearRdbDumper(RdbDumper oldDumper) {

if (!waitingSlaves.isEmpty()) {
try {
logger.info("[clearRdbDumper][redump][rdb] waiting:{}, needRordb:{}", waitingSlaves.size(), needRordbSlaves);
dumpNewRdb(false);
logger.info("[clearRdbDumper][redump][rdb] waiting:{}, needRordb:{}, forceRdb:{}", waitingSlaves.size(), needRordbSlaves, forceRdb);
if (forceRdb) {
dumpNewRdb(false);
} else {
// use RORDB only if all slaves accept it
dumpNewRdb(waitingSlaves.size() == needRordbSlaves);
}
} catch (Throwable th) {
logger.info("[clearRdbDumper][redump] fail", th);
waitingSlaves.forEach(redisSlave -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion;
import com.ctrip.xpipe.redis.core.store.*;
import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor;
import com.ctrip.xpipe.utils.CloseState;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
Expand Down Expand Up @@ -78,7 +79,14 @@ public void addCommandsListener(ReplicationProgress<?> progress, final CommandsL
}
getCommandStoreDelay().beginSend(listener, referenceFileRegion.getTotalPos());

ChannelFuture future = listener.onCommand(cmdReader.getCurCmdFile(), cmdReader.position(), referenceFileRegion);
ChannelFuture future = null;
try {
future = listener.onCommand(cmdReader.getCurCmdFile(), cmdReader.position(), referenceFileRegion);
} catch (CloseState.CloseStateException e) {
logger.info("[addCommandsListener][listener closed] deallocate fileRegion");
referenceFileRegion.deallocate();
throw e;
}

if(future != null){
CommandReader<ReferenceFileRegion> finalCmdReader = cmdReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.meta.KeeperState;
import com.ctrip.xpipe.redis.core.proxy.protocols.DefaultProxyConnectProtocol;
import com.ctrip.xpipe.redis.core.server.FakeRedisServer;
import com.ctrip.xpipe.redis.core.store.ReplId;
import com.ctrip.xpipe.redis.keeper.*;
Expand Down Expand Up @@ -163,12 +162,12 @@ public void testRdbDumperTooQuick() throws Exception {

redisKeeperServer.setRdbDumper(dump1);

redisKeeperServer.clearRdbDumper(dump1);
redisKeeperServer.clearRdbDumper(dump1, false);

// too quick
// force can success
redisKeeperServer.setRdbDumper(dump1, true);
redisKeeperServer.clearRdbDumper(dump1);
redisKeeperServer.clearRdbDumper(dump1, false);

try {
redisKeeperServer.setRdbDumper(dump1);
Expand Down

0 comments on commit 8489c40

Please sign in to comment.