Skip to content

Commit

Permalink
add unknown op lwm
Browse files Browse the repository at this point in the history
  • Loading branch information
hailu committed Jan 22, 2024
1 parent 750abd6 commit 46688f2
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public interface RdbParseContext {
enum RdbType {

STRING(RdbConstant.REDIS_RDB_TYPE_STRING, false, RdbStringParser::new),
// LIST(RdbConstant.REDIS_RDB_TYPE_LIST),
LIST(RdbConstant.REDIS_RDB_TYPE_LIST,false,RdbListParser::new),
SET(RdbConstant.REDIS_RDB_TYPE_SET, false, RdbSetParser::new),
// ZSET(RdbConstant.REDIS_RDB_TYPE_ZSET),
HASH(RdbConstant.REDIS_RDB_TYPE_HASH, false, RdbHashParser::new),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.ctrip.xpipe.redis.core.redis.rdb.parser;

import com.ctrip.xpipe.redis.core.redis.exception.RdbParseEmptyKeyException;
import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpSingleKey;
import com.ctrip.xpipe.redis.core.redis.rdb.RdbLength;
import com.ctrip.xpipe.redis.core.redis.rdb.RdbParseContext;
import com.ctrip.xpipe.redis.core.redis.rdb.RdbParser;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author hailu
* @date 2024/1/17 19:06
*/
public class RdbListParser extends AbstractRdbParser<Integer> implements RdbParser<Integer> {

private RdbParseContext context;

private RdbParser<byte[]> rdbStringParser;

private RdbLength len;

private int readCnt;

private STATE state = STATE.READ_INIT;

private static final Logger logger = LoggerFactory.getLogger(RdbListParser.class);

enum STATE {
READ_INIT,
READ_LEN,
READ_VALUE,
READ_END
}

public RdbListParser(RdbParseContext parseContext) {
this.context = parseContext;
this.rdbStringParser = (RdbParser<byte[]>) context.getOrCreateParser(RdbParseContext.RdbType.STRING);
}

@Override
public Integer read(ByteBuf byteBuf) {

while (!isFinish() && byteBuf.readableBytes() > 0) {

switch (state) {

case READ_INIT:
len = null;
readCnt = 0;
state = STATE.READ_LEN;
break;

case READ_LEN:
len = parseRdbLength(byteBuf);
if (null != len) {
if (len.getLenValue() > 0) {
state = STATE.READ_VALUE;
} else {
throw new RdbParseEmptyKeyException("set key " + context.getKey());
}
}
break;

case READ_VALUE:
byte[] value = rdbStringParser.read(byteBuf);
if (null != value) {
rdbStringParser.reset();
propagateCmdIfNeed(value);

readCnt++;
if (readCnt >= len.getLenValue()) {
state = STATE.READ_END;
} else {
state = STATE.READ_VALUE;
}
}
break;

case READ_END:
default:

}

if (isFinish()) {
propagateExpireAtIfNeed(context.getKey(), context.getExpireMilli());
}
}

if (isFinish()) return len.getLenValue();
else return null;
}

private void propagateCmdIfNeed(byte[] value) {
if (null == value || null == context.getKey()) {
return;
}

notifyRedisOp(new RedisOpSingleKey(
RedisOpType.RPUSH,
new byte[][] {RedisOpType.RPUSH.name().getBytes(), context.getKey().get(), value},
context.getKey(), value));
}

@Override
public boolean isFinish() {
return STATE.READ_END.equals(state);
}

@Override
public void reset() {
super.reset();
if (rdbStringParser != null) {
rdbStringParser.reset();
}
this.state = STATE.READ_INIT;
}

@Override
protected Logger getLogger() {
return logger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,23 @@ public void testParseSet() {
Assert.assertEquals("SADD set 13927438904093012", redisOps.get(3).toString());
Assert.assertEquals("SADD set v1", redisOps.get(4).toString());
}
@Test
public void testParseList() {
ByteBuf byteBuf = Unpooled.wrappedBuffer(listRdbBytes);
while (!parser.isFinish()) {
parser.read(byteBuf);
}

Assert.assertEquals("SELECT 0", redisOps.get(0).toString());
Assert.assertEquals("RPUSH hailu_0_7743 aaa0_0_530", redisOps.get(1).toString());
Assert.assertEquals("RPUSH hailu_0_7743 aaa0_0_454", redisOps.get(2).toString());
Assert.assertEquals("RPUSH hailu_0_7743 aaa1_1_39", redisOps.get(3).toString());
Assert.assertEquals("RPUSH hailu_0_7743 aaa1_1_244", redisOps.get(4).toString());
Assert.assertEquals("RPUSH hailu_0_7743 aaa2_2_497", redisOps.get(5).toString());
Assert.assertEquals("RPUSH hailu_0_7743 aaa2_2_972", redisOps.get(6).toString());
Assert.assertEquals("RPUSH hailu_0_7743 aaa3_3_187", redisOps.get(7).toString());
Assert.assertEquals("RPUSH hailu_0_7743 aaa3_3_148", redisOps.get(8).toString());
}

@Test
public void testParseZiplistZSet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,33 @@ public class RdbDataBytes {
0x33, 0x39, 0x32, 0x37, 0x34, 0x33, 0x38, 0x39, 0x30, 0x34, 0x30, 0x39, 0x33, 0x30, 0x31, 0x32, 0x02, 0x76,
0x31, (byte)0xff, (byte)0xb9, 0x52, 0x00, 0x16, (byte)0xaf, (byte)0xc6, 0x2d, (byte)0xae, 0x0a};

public static final byte[] listRdbBytes = new byte[]{0x52, 0x45, 0x44, 0x49, 0x53, 0x30, 0x30, 0x30, 0x39, (byte) 0xfa, 0x09, 0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x76, 0x65, 0x72, 0x05,
0x36, 0x2e, 0x32, 0x2e, 0x36, (byte)0xfa, 0x0a, 0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x62, 0x69, 0x74, 0x73, (byte)0xc0, 0x40, (byte)0xfa, 0x05, 0x63,
0x74, 0x69, 0x6d, 0x65, (byte)0xc2, (byte)0x9b, (byte)0xc2, (byte)0xa7, 0x65, (byte)0xfa, 0x08, 0x75, 0x73, 0x65, 0x64, 0x2d, 0x6d, 0x65, 0x6d, (byte)0xc2, (byte)0x88, 0x3b,
(byte)0x87, 0x06, (byte)0xfa, 0x0e, 0x72, 0x65, 0x70, 0x6c, 0x2d, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2d, 0x64, 0x62, (byte)0xc0, 0x00, (byte)0xfa, 0x07,
0x72, 0x65, 0x70, 0x6c, 0x2d, 0x69, 0x64, 0x28, 0x34, 0x37, 0x31, 0x34, 0x61, 0x66, 0x61, 0x66, 0x65, 0x39, 0x65, 0x33, 0x63, 0x61,
0x37, 0x38, 0x65, 0x37, 0x63, 0x61, 0x64, 0x61, 0x66, 0x64, 0x65, 0x35, 0x33, 0x62, 0x62, 0x30, 0x66, 0x30, 0x66, 0x31, 0x31, 0x62,
0x37, 0x63, 0x33, 0x39, (byte)0xfa, 0x0b, 0x72, 0x65, 0x70, 0x6c, 0x2d, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x0b, 0x33, 0x32, 0x36, 0x30,
0x36, 0x30, 0x36, 0x32, 0x37, 0x37, 0x35, (byte)0xfa, 0x04, 0x67, 0x74, 0x69, 0x64, 0x40, (byte)0xe9, 0x38, 0x33, 0x31, 0x65, 0x34, 0x62, 0x35,
0x66, 0x39, 0x38, 0x32, 0x38, 0x30, 0x34, 0x31, 0x32, 0x37, 0x31, 0x37, 0x65, 0x63, 0x66, 0x32, 0x39, 0x37, 0x35, 0x39, 0x32, 0x34,
0x33, 0x30, 0x35, 0x30, 0x38, 0x37, 0x32, 0x36, 0x34, 0x36, 0x32, 0x3a, 0x30, 0x2c, 0x33, 0x66, 0x33, 0x33, 0x66, 0x35, 0x65, 0x64,
0x61, 0x38, 0x62, 0x37, 0x63, 0x63, 0x66, 0x65, 0x65, 0x36, 0x66, 0x37, 0x33, 0x32, 0x33, 0x62, 0x61, 0x35, 0x64, 0x34, 0x37, 0x64,
0x39, 0x38, 0x34, 0x62, 0x37, 0x38, 0x35, 0x66, 0x65, 0x39, 0x3a, 0x31, 0x2d, 0x31, 0x33, 0x31, 0x36, 0x32, 0x34, 0x33, 0x36, 0x2c,
0x62, 0x39, 0x37, 0x39, 0x66, 0x61, 0x36, 0x35, 0x66, 0x31, 0x30, 0x64, 0x36, 0x33, 0x62, 0x34, 0x30, 0x37, 0x36, 0x37, 0x32, 0x34,
0x31, 0x31, 0x63, 0x63, 0x33, 0x33, 0x31, 0x33, 0x33, 0x38, 0x30, 0x30, 0x65, 0x32, 0x33, 0x65, 0x32, 0x63, 0x3a, 0x31, 0x2d, 0x34,
0x30, 0x2c, 0x65, 0x35, 0x32, 0x38, 0x62, 0x65, 0x35, 0x30, 0x61, 0x32, 0x64, 0x37, 0x32, 0x61, 0x65, 0x32, 0x33, 0x37, 0x63, 0x64,
0x39, 0x36, 0x37, 0x32, 0x38, 0x36, 0x65, 0x38, 0x31, 0x39, 0x62, 0x38, 0x33, 0x39, 0x36, 0x32, 0x62, 0x36, 0x38, 0x32, 0x3a, 0x31,
0x2d, 0x36, 0x34, 0x2c, 0x37, 0x35, 0x39, 0x33, 0x64, 0x64, 0x63, 0x63, 0x35, 0x31, 0x65, 0x38, 0x36, 0x36, 0x64, 0x61, 0x61, 0x64,
0x33, 0x33, 0x37, 0x34, 0x35, 0x39, 0x31, 0x63, 0x35, 0x31, 0x35, 0x64, 0x39, 0x39, 0x62, 0x31, 0x64, 0x62, 0x36, 0x31, 0x36, 0x38,
0x3a, 0x31, 0x2d, 0x32, 0x37, 0x34, (byte)0xfa, 0x0c, 0x61, 0x6f, 0x66, 0x2d, 0x70, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x6c, 0x65, (byte)0xc0, 0x00,
(byte)0xfe, 0x00, (byte)0xfb, 0x01, 0x00, (byte)0xf9, 0x06, 0x0e, 0x0c, 0x68, 0x61, 0x69, 0x6c, 0x75, 0x5f, 0x30, 0x5f, 0x37, 0x37, 0x34, 0x33, 0x01,
0x40, 0x6a, 0x6a, 0x00, 0x00, 0x00, 0x5d, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x0a, 0x61, 0x61, 0x61, 0x30, 0x5f, 0x30, 0x5f, 0x35,
0x33, 0x30, 0x0c, 0x0a, 0x61, 0x61, 0x61, 0x30, 0x5f, 0x30, 0x5f, 0x34, 0x35, 0x34, 0x0c, 0x09, 0x61, 0x61, 0x61, 0x31, 0x5f, 0x31,
0x5f, 0x33, 0x39, 0x0b, 0x0a, 0x61, 0x61, 0x61, 0x31, 0x5f, 0x31, 0x5f, 0x32, 0x34, 0x34, 0x0c, 0x0a, 0x61, 0x61, 0x61, 0x32, 0x5f,
0x32, 0x5f, 0x34, 0x39, 0x37, 0x0c, 0x0a, 0x61, 0x61, 0x61, 0x32, 0x5f, 0x32, 0x5f, 0x39, 0x37, 0x32, 0x0c, 0x0a, 0x61, 0x61, 0x61,
0x33, 0x5f, 0x33, 0x5f, 0x31, 0x38, 0x37, 0x0c, 0x0a, 0x61, 0x61, 0x61, 0x33, 0x5f, 0x33, 0x5f, 0x31, 0x34, 0x38, (byte)0xff, (byte)0xff, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};

public static final byte[] ziplistZSetRdbBytes = new byte[] {0x52, 0x45, 0x44, 0x49, 0x53, 0x30, 0x30, 0x30, 0x39,
(byte)0xfa, 0x09, 0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x76, 0x65, 0x72, 0x05, 0x36, 0x2e, 0x32, 0x2e, 0x36,
(byte)0xfa, 0x0a, 0x72, 0x65, 0x64, 0x69, 0x73, 0x2d, 0x62, 0x69, 0x74, 0x73, (byte)0xc0, 0x40, (byte)0xfa,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ protected boolean shouldFilter(RedisOp redisOp) {
String channel;
if (length == 3) {
channel = new String(redisOp.buildRawOpArgs()[1]);
} else if(length >= 5) {
} else if (length >= 5) {
channel = new String(redisOp.buildRawOpArgs()[4]);
} else {
logger.warn("publish command {} length={} unexpected, filtered", redisOp, length);
Expand All @@ -301,7 +301,8 @@ protected boolean shouldFilter(RedisOp redisOp) {
}
}
if (redisOp.getOpType().isSwallow()) {
logger.debug("[onRedisOp] filter unknown redisOp: {}", redisOp);
logger.info("[onRedisOp] swallow redisOp: {}", redisOp);
EventMonitor.DEFAULT.logEvent("KEEPER.FILTER.OP", String.format("swallow redisOp %s", redisOp.toString()));
}
return redisOp.getOpType().isSwallow();
}
Expand Down Expand Up @@ -332,6 +333,9 @@ private void doOnRedisOp(RedisOp redisOp, long commandOffsetToAccumulate) {
}

if (shouldFilter(redisOp)) {
if (redisOp.getOpGtid() != null) {
gtid_executed.get().add(redisOp.getOpGtid());
}
offsetRecorder.addAndGet(commandOffsetToAccumulate);
return;
}
Expand Down

0 comments on commit 46688f2

Please sign in to comment.