From 556e38c328a5679872a1b56935b06865f6a8acb7 Mon Sep 17 00:00:00 2001 From: hailu Date: Fri, 20 Sep 2024 16:16:04 +0800 Subject: [PATCH 1/2] repair full gc --- .../keeper/applier/command/SequenceCommand.java | 12 +++++++----- .../applier/sequence/DefaultSequenceController.java | 10 +++++----- .../applier/sync/DefaultCommandDispatcher.java | 1 + 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/command/SequenceCommand.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/command/SequenceCommand.java index 95b1a705c..7cdaf349d 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/command/SequenceCommand.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/command/SequenceCommand.java @@ -11,7 +11,6 @@ /** * @author Slight - * * Make sure to execute this command on stateThread. * Jan 31, 2022 1:06 PM */ @@ -44,7 +43,7 @@ public SequenceCommand(Collection> pasts, Command inner, E private void executeSelf() { CommandFuture future = inner.execute(workerThreads); - future.addListener((f)->{ + future.addListener((f) -> { if (f.isSuccess()) { stateThread.execute(() -> { try { @@ -65,10 +64,13 @@ private void executeSelf() { } private void nextAfter(Collection> pasts) { - for (SequenceCommand past : pasts) { - past.future().addListener((f)->{ + final int size = pasts.size(); + List temp = Lists.newArrayList(pasts); + for (SequenceCommand past : temp) { + past.future().addListener((f) -> { if (f.isSuccess()) { - if (++complete == pasts.size()) { + pasts.remove(past); + if (++complete == size) { executeSelf(); } } else { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/sequence/DefaultSequenceController.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/sequence/DefaultSequenceController.java index 514355f13..44adf26df 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/sequence/DefaultSequenceController.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/sequence/DefaultSequenceController.java @@ -154,8 +154,8 @@ private void submitNoneKeyCommand(RedisOpCommand command, long commandOffset) /* do some stuff when finish */ - releaseMemoryThresholdWhenDone(current, command.redisOp().estimatedSize()); increaseOffsetWhenSuccess(current, commandOffset); + releaseMemoryThresholdWhenDone(current, command.redisOp().estimatedSize()); /* run self */ @@ -189,8 +189,8 @@ private void submitSingleKeyCommand(RedisOpDataCommand command, long commandO /* do some stuff when finish */ - releaseMemoryThresholdWhenDone(current, command.redisOp().estimatedSize()); increaseOffsetWhenSuccess(current, commandOffset); + releaseMemoryThresholdWhenDone(current, command.redisOp().estimatedSize()); /* run self */ @@ -212,8 +212,8 @@ private void submitMultiKeyCommand(RedisOpDataCommand command, long commandOf forgetWhenDone(current, key); } - releaseMemoryThresholdWhenDone(current, command.redisOp().estimatedSize()); increaseOffsetWhenSuccess(current, commandOffset); + releaseMemoryThresholdWhenDone(current, command.redisOp().estimatedSize()); current.execute(); } @@ -240,8 +240,8 @@ private void submitObstacle(RedisOpCommand command, long commandOffset) { /* do some stuff when finish */ - releaseMemoryThresholdWhenDone(current, command.redisOp().estimatedSize()); increaseOffsetWhenSuccess(current, commandOffset); + releaseMemoryThresholdWhenDone(current, command.redisOp().estimatedSize()); /* run self */ @@ -266,7 +266,7 @@ private void forgetWhenDone(SequenceCommand sequenceCommand, RedisKey key) { private void releaseMemoryThresholdWhenDone(SequenceCommand sequenceCommand, long memory) { - sequenceCommand.future().addListener((f)->{ + sequenceCommand.future().addListener((f) -> { concurrencyThreshold.release(); memoryThreshold.release(memory); }); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/sync/DefaultCommandDispatcher.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/sync/DefaultCommandDispatcher.java index 32ab9d2ac..3b2f0240b 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/sync/DefaultCommandDispatcher.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/sync/DefaultCommandDispatcher.java @@ -128,6 +128,7 @@ public void onRdbData(ByteBuf rdbData) { @Override public void endReadRdb(EofType eofType, GtidSet emptyGtidSet, long rdbOffset) { + logger.info("[endReadRdb] eofType={}", eofType); } @Override From 92d7653c005ae1723af4e035422efd262a2b5c5e Mon Sep 17 00:00:00 2001 From: hailu Date: Wed, 25 Sep 2024 11:01:22 +0800 Subject: [PATCH 2/2] repair full gc --- .../com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java | 2 +- .../DefaultRedisKeeperServerConnectToFakeRedisTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java index 123ef1fc1..ce4f2dd25 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java @@ -86,7 +86,7 @@ protected RedisKeeperServer startRedisKeeperServer(int replicationStoreCommandFi commandFileSize, replicationStoreCommandFileNumToKeep, replicationStoreMaxCommandsToTransferBeforeCreateRdb, minTimeMilliToGcAfterCreate); - + ((TestKeeperConfig)keeperConfig).setRdbDumpMinIntervalMilli(0); RedisKeeperServer redisKeeperServer = createRedisKeeperServer(keeperConfig); redisKeeperServer.initialize(); redisKeeperServer.start(); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/DefaultRedisKeeperServerConnectToFakeRedisTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/DefaultRedisKeeperServerConnectToFakeRedisTest.java index 9bcec52da..b8b7045d7 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/DefaultRedisKeeperServerConnectToFakeRedisTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/DefaultRedisKeeperServerConnectToFakeRedisTest.java @@ -42,7 +42,7 @@ public void testReplicationData() throws Exception{ @Test public void testNewDumpCommandsLost() throws Exception{ - startKeeperServerAndTestReFullSync(2, allCommandsSize); + startKeeperServerAndTestReFullSync(1, allCommandsSize); } @Test