Skip to content

Commit

Permalink
add list parse for rdb &pass gtid.lwm & add unknow command parse (#769)
Browse files Browse the repository at this point in the history
* pass gtid.lwm & add unknow command parse

* add unknown op lwm

---------

Co-authored-by: hailu <[email protected]>
  • Loading branch information
tigerLuHai and hailu authored Jan 25, 2024
1 parent b903aab commit aa41c04
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public enum RedisOpType {
MSETNX(true, -3),

// ctrip
GTID_LWM(false, 3, true),
GTID_LWM(false, 3, false),
CTRIP_MERGE_START(false, -1, true),
CTRIP_MERGE_END(false, -2, true),

Expand All @@ -98,7 +98,7 @@ public enum RedisOpType {
EXEC(false, 1),
SCRIPT(false, -2),
MOVE(false, 3),
UNKNOWN(false, -1);
UNKNOWN(false, -1, true);

// Support multi key or not
private boolean supportMultiKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ public enum RedisOpNoneKeyEnum {
PING(RedisOpType.PING),
MULT(RedisOpType.MULTI),
EXEC(RedisOpType.EXEC),
SCRIPT(RedisOpType.SCRIPT);
SCRIPT(RedisOpType.SCRIPT),

UNKNOW(RedisOpType.UNKNOWN);

private RedisOpType redisOpType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public interface RdbParseContext {
enum RdbType {

STRING(RdbConstant.REDIS_RDB_TYPE_STRING, false, RdbStringParser::new),
// LIST(RdbConstant.REDIS_RDB_TYPE_LIST),
LIST(RdbConstant.REDIS_RDB_TYPE_LIST,false,RdbListParser::new),
SET(RdbConstant.REDIS_RDB_TYPE_SET, false, RdbSetParser::new),
// ZSET(RdbConstant.REDIS_RDB_TYPE_ZSET),
HASH(RdbConstant.REDIS_RDB_TYPE_HASH, false, RdbHashParser::new),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.ctrip.xpipe.redis.core.redis.rdb.parser;

import com.ctrip.xpipe.redis.core.redis.exception.RdbParseEmptyKeyException;
import com.ctrip.xpipe.redis.core.redis.operation.RedisOpType;
import com.ctrip.xpipe.redis.core.redis.operation.op.RedisOpSingleKey;
import com.ctrip.xpipe.redis.core.redis.rdb.RdbLength;
import com.ctrip.xpipe.redis.core.redis.rdb.RdbParseContext;
import com.ctrip.xpipe.redis.core.redis.rdb.RdbParser;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author hailu
* @date 2024/1/17 19:06
*/
public class RdbListParser extends AbstractRdbParser<Integer> implements RdbParser<Integer> {

private RdbParseContext context;

private RdbParser<byte[]> rdbStringParser;

private RdbLength len;

private int readCnt;

private STATE state = STATE.READ_INIT;

private static final Logger logger = LoggerFactory.getLogger(RdbListParser.class);

enum STATE {
READ_INIT,
READ_LEN,
READ_VALUE,
READ_END
}

public RdbListParser(RdbParseContext parseContext) {
this.context = parseContext;
this.rdbStringParser = (RdbParser<byte[]>) context.getOrCreateParser(RdbParseContext.RdbType.STRING);
}

@Override
public Integer read(ByteBuf byteBuf) {

while (!isFinish() && byteBuf.readableBytes() > 0) {

switch (state) {

case READ_INIT:
len = null;
readCnt = 0;
state = STATE.READ_LEN;
break;

case READ_LEN:
len = parseRdbLength(byteBuf);
if (null != len) {
if (len.getLenValue() > 0) {
state = STATE.READ_VALUE;
} else {
throw new RdbParseEmptyKeyException("set key " + context.getKey());
}
}
break;

case READ_VALUE:
byte[] value = rdbStringParser.read(byteBuf);
if (null != value) {
rdbStringParser.reset();
propagateCmdIfNeed(value);

readCnt++;
if (readCnt >= len.getLenValue()) {
state = STATE.READ_END;
} else {
state = STATE.READ_VALUE;
}
}
break;

case READ_END:
default:

}

if (isFinish()) {
propagateExpireAtIfNeed(context.getKey(), context.getExpireMilli());
}
}

if (isFinish()) return len.getLenValue();
else return null;
}

private void propagateCmdIfNeed(byte[] value) {
if (null == value || null == context.getKey()) {
return;
}

notifyRedisOp(new RedisOpSingleKey(
RedisOpType.RPUSH,
new byte[][] {RedisOpType.RPUSH.name().getBytes(), context.getKey().get(), value},
context.getKey(), value));
}

@Override
public boolean isFinish() {
return STATE.READ_END.equals(state);
}

@Override
public void reset() {
super.reset();
if (rdbStringParser != null) {
rdbStringParser.reset();
}
this.state = STATE.READ_INIT;
}

@Override
protected Logger getLogger() {
return logger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ public void testCtripGtidLwmParse() {
RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp;
Assert.assertArrayEquals("24d9e2513182d156cbd999df5ebedf24e7634140".getBytes(), redisSingleKeyOp.getKey().get());
Assert.assertArrayEquals("1494763841".getBytes(), redisSingleKeyOp.getValue());
Assert.assertFalse(redisOp.getOpType().isSwallow());
}
@Test
public void testUnknowParse() {
byte[][] rawOpArgs = {"unknow".getBytes(), "unknow_key".getBytes(), "unknow_value".getBytes()};
RedisOp redisOp = parser.parse(rawOpArgs);
Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType());
Assert.assertNull(redisOp.getOpGtid());
Assert.assertArrayEquals(rawOpArgs, redisOp.buildRawOpArgs());
byte[][] rawOpArgs1 = {"credis_flushall".getBytes()};
redisOp = parser.parse(rawOpArgs1);
Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType());
Assert.assertNull(redisOp.getOpGtid());
Assert.assertArrayEquals(rawOpArgs1, redisOp.buildRawOpArgs());

RedisSingleKeyOp redisSingleKeyOp = (RedisSingleKeyOp) redisOp;
Assert.assertNull(redisSingleKeyOp.getKey());
Assert.assertNull(redisSingleKeyOp.getValue());
Assert.assertTrue(redisOp.getOpType().isSwallow());
}

Expand Down Expand Up @@ -138,10 +156,11 @@ public void testPingParse() {
Assert.assertFalse(redisOp.getOpType().isSwallow());
}

@Test(expected = UnsupportedOperationException.class)
@Test
public void testNoneExistsCmdParse() {
RedisOp redisOp = parser.parse(Arrays.asList("EMPTY", "0").toArray());
Assert.assertEquals(RedisOpType.UNKNOWN, redisOp.getOpType());
Assert.assertTrue(redisOp.getOpType().isSwallow());
}

@Test(expected = IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,20 @@ public void testParseSet() {
Assert.assertEquals("SADD set 13927438904093012", redisOps.get(3).toString());
Assert.assertEquals("SADD set v1", redisOps.get(4).toString());
}
@Test
public void testParseList() {
ByteBuf byteBuf = Unpooled.wrappedBuffer(listRdbBytes);
while (!parser.isFinish()) {
parser.read(byteBuf);
}

Assert.assertEquals("SELECT 0", redisOps.get(0).toString());
Assert.assertEquals("RPUSH test_0_7743 aaa0_0_530", redisOps.get(1).toString());
Assert.assertEquals("RPUSH test_0_7743 aaa0_0_454", redisOps.get(2).toString());
Assert.assertEquals("RPUSH test_0_7743 aaa1_1_39", redisOps.get(3).toString());
Assert.assertEquals("RPUSH test_0_7743 aaa1_1_244", redisOps.get(4).toString());

}

@Test
public void testParseZiplistZSet() {
Expand Down
Loading

0 comments on commit aa41c04

Please sign in to comment.