diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java index a59286a87..46585f3a2 100644 --- a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/redis/parser/GeneralRedisOpParserTest.java @@ -68,6 +68,11 @@ public void testUnknowParse() { Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType()); Assert.assertNull(redisOp.getOpGtid()); Assert.assertArrayEquals(rawOpArgs, redisOp.buildRawOpArgs()); + byte[][] rawOpArgs1 = {"credis_flushall".getBytes()}; + redisOp = parser.parse(rawOpArgs1); + Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType()); + Assert.assertNull(redisOp.getOpGtid()); + Assert.assertArrayEquals(rawOpArgs1, redisOp.buildRawOpArgs()); RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp; Assert.assertNull(redisSingleKeyOp.getKey()); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java index 81fca2507..9ac9b07e3 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/applier/xsync/DefaultCommandDispatcher.java @@ -301,8 +301,8 @@ protected boolean shouldFilter(RedisOp redisOp) { } } if (redisOp.getOpType().isSwallow()) { - logger.info("[onRedisOp] swallow redisOp: {}", redisOp); - EventMonitor.DEFAULT.logEvent("APPLIER.SWALLOW.OP", String.format("swallow redisOp %s", redisOp.toString())); + logger.info("[onRedisOp] swallow redisOp: {}", redisOp.toString()); + EventMonitor.DEFAULT.logEvent("APPLIER.SWALLOW.OP", new String(redisOp.buildRawOpArgs()[0])); } return redisOp.getOpType().isSwallow(); } @@ -333,7 +333,7 @@ private void doOnRedisOp(RedisOp redisOp, long commandOffsetToAccumulate) { } if (shouldFilter(redisOp)) { - gtid_executed.get().add(redisOp.getOpGtid()); + addLwm(redisOp); offsetRecorder.addAndGet(commandOffsetToAccumulate); return; } @@ -349,6 +349,16 @@ private void doOnRedisOp(RedisOp redisOp, long commandOffsetToAccumulate) { } } + private void addLwm(RedisOp redisOp) { + stateThread.execute(() -> { + try { + gtid_executed.get().add(redisOp.getOpGtid()); + } catch (Throwable t) { + logger.error("[onRedisOp] unlikely - error", t); + } + }); + } + @Override public void onRedisOp(RedisOp redisOp) { doOnRedisOp(redisOp, 0);