diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java index 5db8efd4a..a1b289d13 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java @@ -4,7 +4,6 @@ import com.ctrip.xpipe.api.lifecycle.Destroyable; import com.ctrip.xpipe.redis.core.entity.KeeperInstanceMeta; import com.ctrip.xpipe.redis.core.entity.KeeperMeta; -import com.ctrip.xpipe.redis.core.entity.KeeperTransMeta; import com.ctrip.xpipe.redis.core.protocal.PsyncObserver; import com.ctrip.xpipe.redis.core.store.ClusterId; import com.ctrip.xpipe.redis.core.store.ReplicationStore; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisSlave.java index 19a1a4a47..346f546b4 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisSlave.java @@ -18,6 +18,8 @@ public interface RedisSlave extends RedisClient, PartialAware void waitForRdbDumping(); void waitForGtidParse(); + + void waitForSeqFsync(); SLAVE_STATE getSlaveState(); @@ -44,6 +46,13 @@ public interface RedisSlave extends RedisClient, PartialAware */ void markPsyncProcessed(); + /** + * If psync ? -1, slave start with no data, we should fsync immediately + */ + void markColdStart(); + + boolean isColdStart(); + String metaInfo(); boolean supportProgress(Class> clazz); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/SLAVE_STATE.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/SLAVE_STATE.java index b1da240ed..4bdadeb06 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/SLAVE_STATE.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/SLAVE_STATE.java @@ -9,6 +9,7 @@ public enum SLAVE_STATE { REDIS_REPL_WAIT_RDB_DUMPING("wait_rdb_dumping"), REDIS_REPL_WAIT_RDB_GTIDSET("wait_rdb_gtidset"), + REDIS_REPL_WAIT_SEQ_FSYNC("wait_seq_fsync"), REDIS_REPL_SEND_BULK("send_bulk"), REDIS_REPL_ONLINE("online"); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperConfig.java index a467144c0..e14cda951 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperConfig.java @@ -47,6 +47,8 @@ public class DefaultKeeperConfig extends AbstractCoreConfig implements KeeperCon private static String KEY_APPLIER_READ_IDLE_SECONDS = "applier.read.idle.seconds"; + private static String KEY_MAX_FSYNC_SLAVES = "replication.loading.slaves.max"; + public DefaultKeeperConfig(){ CompositeConfig compositeConfig = new CompositeConfig(); @@ -181,4 +183,9 @@ public int getApplierReadIdleSeconds() { public int getKeyReplicationTimeoutMilli() { return getIntProperty(KEY_REPLICATION_TIMEOUT_MILLI, AbstractRedisMasterReplication.DEFAULT_REPLICATION_TIMEOUT_MILLI); } + + @Override + public int getMaxLoadingSlavesCnt() { + return getIntProperty(KEY_MAX_FSYNC_SLAVES, -1); + } } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java index e28907640..f63b7174e 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java @@ -64,4 +64,11 @@ public interface KeeperConfig extends CoreConfig{ int getApplierReadIdleSeconds(); int getKeyReplicationTimeoutMilli(); + + /** + * max redis slaves allowed to loading rdb at the same time + * -1 means no limit + */ + int getMaxLoadingSlavesCnt(); + } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java index 39cf0b040..74875a662 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java @@ -253,6 +253,18 @@ public int getKeyReplicationTimeoutMilli() { return keyReplicationTimeoutMilli; } + private int maxLoadingSlaves = -1; + + public TestKeeperConfig setMaxLoadingSlaves(int slaves) { + this.maxLoadingSlaves = slaves; + return this; + } + + @Override + public int getMaxLoadingSlavesCnt() { + return maxLoadingSlaves; + } + @Override public int getApplierReadIdleSeconds() { return 60; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandler.java index 268d1062e..2872c1ae9 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandler.java @@ -10,7 +10,6 @@ import com.ctrip.xpipe.redis.keeper.RedisSlave; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; import com.ctrip.xpipe.redis.core.store.OffsetReplicationProgress; -import com.ctrip.xpipe.redis.keeper.handler.keeper.AbstractSyncCommandHandler; import java.io.IOException; @@ -37,6 +36,9 @@ protected void innerDoHandle(final String[] args, final RedisSlave redisSlave, R Long offsetRequest = Long.valueOf(args[1]); String replIdRequest = args[0]; + if (replIdRequest.equals("?")) { + redisSlave.markColdStart(); + } if(replIdRequest.equals("?")){ if (redisSlave.isKeeper() && offsetRequest.equals(KEEPER_PARTIAL_SYNC_OFFSET) && null != keeperRepl.replId()) { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java index e5ae2d37a..870bfc791 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java @@ -13,6 +13,7 @@ import com.ctrip.xpipe.api.observer.Observer; import com.ctrip.xpipe.api.proxy.ProxyEnabled; import com.ctrip.xpipe.cluster.ElectContext; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; import com.ctrip.xpipe.concurrent.LongTimeAlertTask; import com.ctrip.xpipe.endpoint.DefaultEndPoint; import com.ctrip.xpipe.exception.XpipeRuntimeException; @@ -49,6 +50,7 @@ import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.internal.ConcurrentSet; import java.io.File; import java.io.IOException; @@ -60,8 +62,10 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.core.store.FULLSYNC_FAIL_CAUSE.RDB_GTIDSET_NOT_READY; +import static com.ctrip.xpipe.redis.keeper.SLAVE_STATE.*; /** * @author wenchao.meng @@ -81,6 +85,9 @@ public class DefaultRedisKeeperServer extends AbstractRedisServer implements Red public static int DEFAULT_KEEPER_WORKER_GROUP_THREAD_COUNT = Integer.parseInt(System.getProperty(KEY_DEFAULT_KEEPER_WORKER_GROUP_THREAD_COUNT, "5")); private static final int DEFAULT_LONG_TIME_ALERT_TASK_MILLI = 1000; + private static String KEY_SEQ_FSYNC_CHECK_PERIOD_SEC = "SEQ_FSYNC_CHECK_PERIOD_SEC"; + public static int DEFAULT_FSYNC_CHECK_PERIOD_SEC = Integer.parseInt(System.getProperty(KEY_SEQ_FSYNC_CHECK_PERIOD_SEC, "5")); + /** * when keeper is active, it's redis master, else it's another keeper */ @@ -99,6 +106,11 @@ public class DefaultRedisKeeperServer extends AbstractRedisServer implements Red private final Map> redisClients = new ConcurrentHashMap<>(); + /** + * redis slaves receiving rdb or loading rdb + */ + private final Set loadingSlaves = new ConcurrentSet<>(); + private String threadPoolName; private volatile boolean isStartIndexing; @@ -272,6 +284,54 @@ protected void doStart() throws Exception { startServer(); LifecycleHelper.startIfPossible(keeperRedisMaster); this.leaderElector.start(); + this.scheduled.scheduleWithFixedDelay(new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + updateLoadingSlaves(); + continueFsyncSequentially(); + } + }, DEFAULT_FSYNC_CHECK_PERIOD_SEC, DEFAULT_FSYNC_CHECK_PERIOD_SEC, TimeUnit.SECONDS); + } + + @VisibleForTesting + protected void continueFsyncSequentially() { + if (!getRedisKeeperServerState().keeperState().isActive()) return; + + int maxLoadingSlavesCnt = keeperConfig.getMaxLoadingSlavesCnt(); + Set slaves = slaves(); + int currentLoadingSlaves = loadingSlaves.size(); + if (maxLoadingSlavesCnt >= 0 && currentLoadingSlaves >= maxLoadingSlavesCnt) return; + + for (RedisSlave slave: slaves) { + if (slave.getSlaveState() == REDIS_REPL_WAIT_SEQ_FSYNC) { + continueFsyncToSlave(slave); + } + } + } + + private void continueFsyncToSlave(RedisSlave slave) { + try { + logger.info("[continueFsyncToSlave]{}", slave); + slave.processPsyncSequentially(new Runnable() { + @Override + public void run() { + try { + fullSyncToSlave(slave); + } catch (Throwable th) { + try { + logger.error("[continueFsyncToSlave][run]{}", slave, th); + if(slave.isOpen()){ + slave.close(); + } + } catch (IOException e) { + logger.error("[continueFsyncToSlave][close]{}", slave, th); + } + } + } + }); + } catch (Throwable th) { + logger.info("[continueFsyncToSlave][fail]{}", slave, th); + } } @Override @@ -655,6 +715,11 @@ public void promoteSlave(String ip, int port) throws RedisSlavePromotionExceptio public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException { logger.info("[fullSyncToSlave]{}, {}", redisSlave, rdbDumper.get()); + if (!tryFullSyncToSlaveWithOthers(redisSlave)) { + redisSlave.waitForSeqFsync(); + return; + } + if(rdbDumper.get() == null){ logger.info("[fullSyncToSlave][dumper null]{}", redisSlave); FullSyncListener fullSyncListener = new DefaultFullSyncListener(redisSlave); @@ -681,6 +746,29 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException { } } + private synchronized boolean tryFullSyncToSlaveWithOthers(RedisSlave redisSlave) { + if (redisSlave.isKeeper()) return true; + if (loadingSlaves.contains(redisSlave)) return true; + + int maxConcurrentLoadingSlaves = keeperConfig.getMaxLoadingSlavesCnt(); + if (redisSlave.isColdStart() || maxConcurrentLoadingSlaves < 0 || loadingSlaves.size() < maxConcurrentLoadingSlaves) { + loadingSlaves.add(redisSlave); + return true; + } + + return false; + } + + @VisibleForTesting + protected synchronized void updateLoadingSlaves() { + Set filterSlaves = loadingSlaves.stream() + .filter(slave -> slave.isKeeper() || !slave.isOpen() + || (slave.getSlaveState() == REDIS_REPL_ONLINE && slave.getAck() != null)) + .collect(Collectors.toSet()); + + filterSlaves.forEach(loadingSlaves::remove); + } + @Override public synchronized void startIndexing() throws IOException { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java index 16b9406db..e9470067a 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java @@ -72,6 +72,8 @@ public class DefaultRedisSlave implements RedisSlave { private AtomicBoolean writingCommands = new AtomicBoolean(false); + private volatile boolean coldStart = false; + private ChannelFutureListener writeExceptionListener = new ChannelFutureListener() { private AtomicLong atomicLong = new AtomicLong(0); @@ -122,7 +124,11 @@ public void waitForRdbDumping() { } this.slaveState = SLAVE_STATE.REDIS_REPL_WAIT_RDB_DUMPING; - this.waitForRdb(); + if (null == pingFuture || pingFuture.isDone()) { + waitForRdb(); + } else { + getLogger().info("[waitForRdbDumping][already start wait]{}", this); + } } @Override @@ -142,6 +148,23 @@ public void waitForGtidParse() { } } + @Override + public void waitForSeqFsync() { + if(this.slaveState == SLAVE_STATE.REDIS_REPL_WAIT_SEQ_FSYNC) { + getLogger().info("[waitForSeqFsync][already waiting]{}", this); + return; + } + + logger.info("[waitForSeqFsync]{}", this); + this.slaveState = SLAVE_STATE.REDIS_REPL_WAIT_SEQ_FSYNC; + + if (null == pingFuture || pingFuture.isDone()) { + waitForRdb(); + } else { + getLogger().info("[waitForSeqFsync][already start wait]{}", this); + } + } + private void waitForRdb() { getLogger().info("[waitForRdb][begin ping]{}", this); pingFuture = scheduled.scheduleAtFixedRate(new Runnable() { @@ -166,6 +189,16 @@ protected void doRun() throws IOException { }, rdbDumpMaxWaitMilli, TimeUnit.MILLISECONDS); } + @Override + public void markColdStart() { + this.coldStart = true; + } + + @Override + public boolean isColdStart() { + return coldStart; + } + @Override public SLAVE_STATE getSlaveState() { return this.slaveState; diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServerTest.java index fcbfbbf1f..3854d5104 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServerTest.java @@ -1,11 +1,13 @@ package com.ctrip.xpipe.redis.keeper.impl; +import com.ctrip.xpipe.endpoint.DefaultEndPoint; import com.ctrip.xpipe.redis.core.entity.KeeperMeta; import com.ctrip.xpipe.redis.core.meta.KeeperState; import com.ctrip.xpipe.redis.core.server.FakeRedisServer; import com.ctrip.xpipe.redis.keeper.*; import com.ctrip.xpipe.redis.keeper.config.TestKeeperConfig; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Assert; import org.junit.Before; @@ -17,8 +19,9 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import static com.ctrip.xpipe.redis.keeper.SLAVE_STATE.*; import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.*; /** * @author wenchao.meng @@ -268,4 +271,45 @@ public void fixDeadSlave() throws Exception { RedisSlave slave = client.becomeSlave(); assertFalse(redisKeeperServer.allClients().contains(slave)); } + + private RedisSlave mockRedisSlave(RedisKeeperServer redisKeeperServer) { + ChannelFuture future = Mockito.mock(ChannelFuture.class); + Channel channel = Mockito.mock(Channel.class); + when(channel.closeFuture()).thenReturn(future); + RedisClient client = redisKeeperServer.clientConnected(channel); + RedisSlave slave = client.becomeSlave(); + return slave; + } + + @Test + public void testReqFsyncSeq() throws Exception { + ((TestKeeperConfig)keeperConfig).setMaxLoadingSlaves(1); + RedisKeeperServer redisKeeperServer = createRedisKeeperServer(); + redisKeeperServer.initialize(); + redisKeeperServer.setRedisKeeperServerState(new RedisKeeperServerStateActive( + redisKeeperServer, new DefaultEndPoint("10.0.0.1", 6379))); + RdbDumper dumper = Mockito.mock(RdbDumper.class); + redisKeeperServer.setRdbDumper(dumper); + + RedisSlave slave1 = mockRedisSlave(redisKeeperServer); + RedisSlave slave2 = mockRedisSlave(redisKeeperServer); + + redisKeeperServer.fullSyncToSlave(slave1); + redisKeeperServer.fullSyncToSlave(slave2); + Assert.assertEquals(slave2.getSlaveState(), REDIS_REPL_WAIT_SEQ_FSYNC); + + slave1.close(); + redisKeeperServer.clientDisconnected(slave1.channel()); + ((DefaultRedisKeeperServer)redisKeeperServer).updateLoadingSlaves(); + ((DefaultRedisKeeperServer)redisKeeperServer).continueFsyncSequentially(); + waitConditionUntilTimeOut(() -> { + try { + verify(dumper, times(2)).tryFullSync(any()); + return true; + } catch (Throwable e) { + return false; + } + }); + } + }