Skip to content

Commit

Permalink
full sync to Redis slave sequentially
Browse files Browse the repository at this point in the history
  • Loading branch information
lishanglin committed Jul 28, 2023
1 parent ba3b487 commit 361c8a6
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.ctrip.xpipe.api.lifecycle.Destroyable;
import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta;
import com.ctrip.xpipe.redis.core.protocal.PsyncObserver;
import com.ctrip.xpipe.redis.core.store.ClusterId;
import com.ctrip.xpipe.redis.core.store.ReplicationStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public interface RedisSlave extends RedisClient<RedisKeeperServer>, PartialAware
void waitForRdbDumping();

void waitForGtidParse();

void waitForSeqFsync();

SLAVE_STATE getSlaveState();

Expand All @@ -44,6 +46,13 @@ public interface RedisSlave extends RedisClient<RedisKeeperServer>, PartialAware
*/
void markPsyncProcessed();

/**
* If psync ? -1, slave start with no data, we should fsync immediately
*/
void markColdStart();

boolean isColdStart();

String metaInfo();

boolean supportProgress(Class<? extends ReplicationProgress<?>> clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public enum SLAVE_STATE {

REDIS_REPL_WAIT_RDB_DUMPING("wait_rdb_dumping"),
REDIS_REPL_WAIT_RDB_GTIDSET("wait_rdb_gtidset"),
REDIS_REPL_WAIT_SEQ_FSYNC("wait_seq_fsync"),
REDIS_REPL_SEND_BULK("send_bulk"),
REDIS_REPL_ONLINE("online");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class DefaultKeeperConfig extends AbstractCoreConfig implements KeeperCon

private static String KEY_APPLIER_READ_IDLE_SECONDS = "applier.read.idle.seconds";

private static String KEY_MAX_FSYNC_SLAVES = "replication.loading.slaves.max";

public DefaultKeeperConfig(){

CompositeConfig compositeConfig = new CompositeConfig();
Expand Down Expand Up @@ -181,4 +183,9 @@ public int getApplierReadIdleSeconds() {
public int getKeyReplicationTimeoutMilli() {
return getIntProperty(KEY_REPLICATION_TIMEOUT_MILLI, AbstractRedisMasterReplication.DEFAULT_REPLICATION_TIMEOUT_MILLI);
}

@Override
public int getMaxLoadingSlavesCnt() {
return getIntProperty(KEY_MAX_FSYNC_SLAVES, -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,11 @@ public interface KeeperConfig extends CoreConfig{
int getApplierReadIdleSeconds();

int getKeyReplicationTimeoutMilli();

/**
* max redis slaves allowed to loading rdb at the same time
* -1 means no limit
*/
int getMaxLoadingSlavesCnt();

}
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,18 @@ public int getKeyReplicationTimeoutMilli() {
return keyReplicationTimeoutMilli;
}

private int maxLoadingSlaves = -1;

public TestKeeperConfig setMaxLoadingSlaves(int slaves) {
this.maxLoadingSlaves = slaves;
return this;
}

@Override
public int getMaxLoadingSlavesCnt() {
return maxLoadingSlaves;
}

@Override
public int getApplierReadIdleSeconds() {
return 60;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.ctrip.xpipe.redis.keeper.RedisSlave;
import com.ctrip.xpipe.redis.keeper.config.KeeperConfig;
import com.ctrip.xpipe.redis.core.store.OffsetReplicationProgress;
import com.ctrip.xpipe.redis.keeper.handler.keeper.AbstractSyncCommandHandler;

import java.io.IOException;

Expand All @@ -37,6 +36,9 @@ protected void innerDoHandle(final String[] args, final RedisSlave redisSlave, R

Long offsetRequest = Long.valueOf(args[1]);
String replIdRequest = args[0];
if (replIdRequest.equals("?")) {
redisSlave.markColdStart();
}

if(replIdRequest.equals("?")){
if (redisSlave.isKeeper() && offsetRequest.equals(KEEPER_PARTIAL_SYNC_OFFSET) && null != keeperRepl.replId()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.ctrip.xpipe.api.observer.Observer;
import com.ctrip.xpipe.api.proxy.ProxyEnabled;
import com.ctrip.xpipe.cluster.ElectContext;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
import com.ctrip.xpipe.concurrent.LongTimeAlertTask;
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.exception.XpipeRuntimeException;
Expand Down Expand Up @@ -49,6 +50,7 @@
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.internal.ConcurrentSet;

import java.io.File;
import java.io.IOException;
Expand All @@ -60,8 +62,10 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.ctrip.xpipe.redis.core.store.FULLSYNC_FAIL_CAUSE.RDB_GTIDSET_NOT_READY;
import static com.ctrip.xpipe.redis.keeper.SLAVE_STATE.*;

/**
* @author wenchao.meng
Expand All @@ -81,6 +85,9 @@ public class DefaultRedisKeeperServer extends AbstractRedisServer implements Red
public static int DEFAULT_KEEPER_WORKER_GROUP_THREAD_COUNT = Integer.parseInt(System.getProperty(KEY_DEFAULT_KEEPER_WORKER_GROUP_THREAD_COUNT, "5"));
private static final int DEFAULT_LONG_TIME_ALERT_TASK_MILLI = 1000;

private static String KEY_SEQ_FSYNC_CHECK_PERIOD_SEC = "SEQ_FSYNC_CHECK_PERIOD_SEC";
public static int DEFAULT_FSYNC_CHECK_PERIOD_SEC = Integer.parseInt(System.getProperty(KEY_SEQ_FSYNC_CHECK_PERIOD_SEC, "5"));

/**
* when keeper is active, it's redis master, else it's another keeper
*/
Expand All @@ -99,6 +106,11 @@ public class DefaultRedisKeeperServer extends AbstractRedisServer implements Red

private final Map<Channel, RedisClient<RedisKeeperServer>> redisClients = new ConcurrentHashMap<>();

/**
* redis slaves receiving rdb or loading rdb
*/
private final Set<RedisSlave> loadingSlaves = new ConcurrentSet<>();

private String threadPoolName;

private volatile boolean isStartIndexing;
Expand Down Expand Up @@ -272,6 +284,54 @@ protected void doStart() throws Exception {
startServer();
LifecycleHelper.startIfPossible(keeperRedisMaster);
this.leaderElector.start();
this.scheduled.scheduleWithFixedDelay(new AbstractExceptionLogTask() {
@Override
protected void doRun() throws Exception {
updateLoadingSlaves();
continueFsyncSequentially();
}
}, DEFAULT_FSYNC_CHECK_PERIOD_SEC, DEFAULT_FSYNC_CHECK_PERIOD_SEC, TimeUnit.SECONDS);
}

@VisibleForTesting
protected void continueFsyncSequentially() {
if (!getRedisKeeperServerState().keeperState().isActive()) return;

int maxLoadingSlavesCnt = keeperConfig.getMaxLoadingSlavesCnt();
Set<RedisSlave> slaves = slaves();
int currentLoadingSlaves = loadingSlaves.size();
if (maxLoadingSlavesCnt >= 0 && currentLoadingSlaves >= maxLoadingSlavesCnt) return;

for (RedisSlave slave: slaves) {
if (slave.getSlaveState() == REDIS_REPL_WAIT_SEQ_FSYNC) {
continueFsyncToSlave(slave);
}
}
}

private void continueFsyncToSlave(RedisSlave slave) {
try {
logger.info("[continueFsyncToSlave]{}", slave);
slave.processPsyncSequentially(new Runnable() {
@Override
public void run() {
try {
fullSyncToSlave(slave);
} catch (Throwable th) {
try {
logger.error("[continueFsyncToSlave][run]{}", slave, th);
if(slave.isOpen()){
slave.close();
}
} catch (IOException e) {
logger.error("[continueFsyncToSlave][close]{}", slave, th);
}
}
}
});
} catch (Throwable th) {
logger.info("[continueFsyncToSlave][fail]{}", slave, th);
}
}

@Override
Expand Down Expand Up @@ -655,6 +715,11 @@ public void promoteSlave(String ip, int port) throws RedisSlavePromotionExceptio
public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {

logger.info("[fullSyncToSlave]{}, {}", redisSlave, rdbDumper.get());
if (!tryFullSyncToSlaveWithOthers(redisSlave)) {
redisSlave.waitForSeqFsync();
return;
}

if(rdbDumper.get() == null){
logger.info("[fullSyncToSlave][dumper null]{}", redisSlave);
FullSyncListener fullSyncListener = new DefaultFullSyncListener(redisSlave);
Expand All @@ -681,6 +746,29 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {
}
}

private synchronized boolean tryFullSyncToSlaveWithOthers(RedisSlave redisSlave) {
if (redisSlave.isKeeper()) return true;
if (loadingSlaves.contains(redisSlave)) return true;

int maxConcurrentLoadingSlaves = keeperConfig.getMaxLoadingSlavesCnt();
if (redisSlave.isColdStart() || maxConcurrentLoadingSlaves < 0 || loadingSlaves.size() < maxConcurrentLoadingSlaves) {
loadingSlaves.add(redisSlave);
return true;
}

return false;
}

@VisibleForTesting
protected synchronized void updateLoadingSlaves() {
Set<RedisSlave> filterSlaves = loadingSlaves.stream()
.filter(slave -> slave.isKeeper() || !slave.isOpen()
|| (slave.getSlaveState() == REDIS_REPL_ONLINE && slave.getAck() != null))
.collect(Collectors.toSet());

filterSlaves.forEach(loadingSlaves::remove);
}

@Override
public synchronized void startIndexing() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class DefaultRedisSlave implements RedisSlave {

private AtomicBoolean writingCommands = new AtomicBoolean(false);

private volatile boolean coldStart = false;

private ChannelFutureListener writeExceptionListener = new ChannelFutureListener() {

private AtomicLong atomicLong = new AtomicLong(0);
Expand Down Expand Up @@ -122,7 +124,11 @@ public void waitForRdbDumping() {
}

this.slaveState = SLAVE_STATE.REDIS_REPL_WAIT_RDB_DUMPING;
this.waitForRdb();
if (null == pingFuture || pingFuture.isDone()) {
waitForRdb();
} else {
getLogger().info("[waitForRdbDumping][already start wait]{}", this);
}
}

@Override
Expand All @@ -142,6 +148,23 @@ public void waitForGtidParse() {
}
}

@Override
public void waitForSeqFsync() {
if(this.slaveState == SLAVE_STATE.REDIS_REPL_WAIT_SEQ_FSYNC) {
getLogger().info("[waitForSeqFsync][already waiting]{}", this);
return;
}

logger.info("[waitForSeqFsync]{}", this);
this.slaveState = SLAVE_STATE.REDIS_REPL_WAIT_SEQ_FSYNC;

if (null == pingFuture || pingFuture.isDone()) {
waitForRdb();
} else {
getLogger().info("[waitForSeqFsync][already start wait]{}", this);
}
}

private void waitForRdb() {
getLogger().info("[waitForRdb][begin ping]{}", this);
pingFuture = scheduled.scheduleAtFixedRate(new Runnable() {
Expand All @@ -166,6 +189,16 @@ protected void doRun() throws IOException {
}, rdbDumpMaxWaitMilli, TimeUnit.MILLISECONDS);
}

@Override
public void markColdStart() {
this.coldStart = true;
}

@Override
public boolean isColdStart() {
return coldStart;
}

@Override
public SLAVE_STATE getSlaveState() {
return this.slaveState;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.ctrip.xpipe.redis.keeper.impl;

import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.redis.core.entity.KeeperMeta;
import com.ctrip.xpipe.redis.core.meta.KeeperState;
import com.ctrip.xpipe.redis.core.server.FakeRedisServer;
import com.ctrip.xpipe.redis.keeper.*;
import com.ctrip.xpipe.redis.keeper.config.TestKeeperConfig;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -17,8 +19,9 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ctrip.xpipe.redis.keeper.SLAVE_STATE.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.*;

/**
* @author wenchao.meng
Expand Down Expand Up @@ -268,4 +271,45 @@ public void fixDeadSlave() throws Exception {
RedisSlave slave = client.becomeSlave();
assertFalse(redisKeeperServer.allClients().contains(slave));
}

private RedisSlave mockRedisSlave(RedisKeeperServer redisKeeperServer) {
ChannelFuture future = Mockito.mock(ChannelFuture.class);
Channel channel = Mockito.mock(Channel.class);
when(channel.closeFuture()).thenReturn(future);
RedisClient client = redisKeeperServer.clientConnected(channel);
RedisSlave slave = client.becomeSlave();
return slave;
}

@Test
public void testReqFsyncSeq() throws Exception {
((TestKeeperConfig)keeperConfig).setMaxLoadingSlaves(1);
RedisKeeperServer redisKeeperServer = createRedisKeeperServer();
redisKeeperServer.initialize();
redisKeeperServer.setRedisKeeperServerState(new RedisKeeperServerStateActive(
redisKeeperServer, new DefaultEndPoint("10.0.0.1", 6379)));
RdbDumper dumper = Mockito.mock(RdbDumper.class);
redisKeeperServer.setRdbDumper(dumper);

RedisSlave slave1 = mockRedisSlave(redisKeeperServer);
RedisSlave slave2 = mockRedisSlave(redisKeeperServer);

redisKeeperServer.fullSyncToSlave(slave1);
redisKeeperServer.fullSyncToSlave(slave2);
Assert.assertEquals(slave2.getSlaveState(), REDIS_REPL_WAIT_SEQ_FSYNC);

slave1.close();
redisKeeperServer.clientDisconnected(slave1.channel());
((DefaultRedisKeeperServer)redisKeeperServer).updateLoadingSlaves();
((DefaultRedisKeeperServer)redisKeeperServer).continueFsyncSequentially();
waitConditionUntilTimeOut(() -> {
try {
verify(dumper, times(2)).tryFullSync(any());
return true;
} catch (Throwable e) {
return false;
}
});
}

}

0 comments on commit 361c8a6

Please sign in to comment.