Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keeper support fsync with rordb #772

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.ctrip.xpipe.cache;

import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* @author lishanglin
* date 2021/4/17
*/
public class TimeBoundCache<T> {

private T data;

private long lastRefreshAt;

private long expiredAt;

private LongSupplier timeoutMillSupplier;

private Supplier<T> dataSupplier;

public TimeBoundCache(LongSupplier timeoutMillSupplier, Supplier<T> dataSupplier) {
this.data = null;
this.expiredAt = 0L;
this.lastRefreshAt = 0L;
this.timeoutMillSupplier = timeoutMillSupplier;
this.dataSupplier = dataSupplier;
}

Check warning on line 28 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L22-L28

Added lines #L22 - L28 were not covered by tests

// allow data timeout
public T getCurrentData() {
return this.data;

Check warning on line 32 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L32

Added line #L32 was not covered by tests
}

public T getData() {
return getData(false);

Check warning on line 36 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L36

Added line #L36 was not covered by tests
}

public T getData(boolean disableCache) {
long current = System.currentTimeMillis();

Check warning on line 40 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L40

Added line #L40 was not covered by tests
if (current < lastRefreshAt) {
// system time roll back
resetExpireAt();

Check warning on line 43 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L43

Added line #L43 was not covered by tests
}

if (!disableCache && null != data && expiredAt > current) {
return data;

Check warning on line 47 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L47

Added line #L47 was not covered by tests
}

synchronized (this) {

Check warning on line 50 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L50

Added line #L50 was not covered by tests
if (!disableCache && null != data && expiredAt > current) return data;
data = dataSupplier.get();
refreshExpireAt();
return data;

Check warning on line 54 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L52-L54

Added lines #L52 - L54 were not covered by tests
}
}

public synchronized void refresh() {
this.data = dataSupplier.get();
refreshExpireAt();
}

Check warning on line 61 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L59-L61

Added lines #L59 - L61 were not covered by tests

private void resetExpireAt() {
this.lastRefreshAt = 0L;
this.expiredAt = 0L;
}

Check warning on line 66 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L64-L66

Added lines #L64 - L66 were not covered by tests

private void refreshExpireAt() {
long timeout = timeoutMillSupplier.getAsLong();
this.lastRefreshAt = System.currentTimeMillis();
this.expiredAt = lastRefreshAt + timeout;

Check warning on line 71 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L69-L71

Added lines #L69 - L71 were not covered by tests
// expiredAt exceeds max long
if (this.expiredAt < timeout) this.expiredAt = Long.MAX_VALUE;
}

Check warning on line 74 in core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java#L74

Added line #L74 was not covered by tests

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.ctrip.xpipe.payload;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.ctrip.xpipe.redis.console.cache.impl;

import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.cache.DcCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.model.DcTbl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.ctrip.xpipe.config.AbstractConfigBean;
import com.ctrip.xpipe.redis.checker.alert.AlertDbConfig;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.config.ConsoleDbConfig;
import com.ctrip.xpipe.redis.console.model.ConfigModel;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.ctrip.xpipe.redis.console.controller.api.migrate;

import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.MigrationProgress;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.ctrip.xpipe.redis.console.resources;

import com.ctrip.xpipe.redis.checker.PersistenceCache;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.ctrip.xpipe.api.monitor.Task;
import com.ctrip.xpipe.api.monitor.TransactionMonitor;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.cluster.ConsoleLeaderAware;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.exception.DataNotFoundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.ctrip.xpipe.api.migration.OuterClientService;
import com.ctrip.xpipe.lifecycle.AbstractLifecycle;
import com.ctrip.xpipe.redis.checker.OuterClientCache;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.ctrip.xpipe.redis.console.sentinel.impl;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.model.SentinelGroupModel;
import com.ctrip.xpipe.redis.console.sentinel.SentinelBalanceService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
public enum CAPA {

EOF,
PSYNC2;
PSYNC2,
RORDB;

Check warning on line 12 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/CAPA.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/CAPA.java#L11-L12

Added lines #L11 - L12 were not covered by tests

public static CAPA of(String capaString) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.redis.core.store.RdbStore;

import java.io.IOException;
import java.util.Map;

/**
* @author wenchao.meng
Expand All @@ -28,9 +29,7 @@ public interface PsyncObserver {
*/
void beginWriteRdb(EofType eofType, String replId, long masterRdbOffset) throws IOException;

void readRdbGtidSet(RdbStore rdbStore, String gtidSet);

void readAuxEnd(RdbStore rdbStore);
void readAuxEnd(RdbStore rdbStore, Map<String, String> auxMap);

void endWriteRdb();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public static enum REDIS_CONFIG_TYPE{
DISKLESS_SYNC_DELAY("repl-diskless-sync-delay"),
SLAVE_READONLY("slave-read-only"),

SLAVE_REPL_ALL("slave-repl-all")//extend for xredis
SLAVE_REPL_ALL("slave-repl-all"), //extend for xredis
RORDB_SYNC("swap-repl-rordb-sync") // extend for ror
;

private String configName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import static com.ctrip.xpipe.redis.core.redis.rdb.RdbConstant.REDIS_RDB_AUX_KEY_GTID;

/**
* @author marsqing
*
Expand Down Expand Up @@ -98,7 +97,7 @@
@Override
protected void beginReadRdb(EofType eofType) {
try {
rdbStore = currentReplicationStore.beginRdb(replId, masterRdbOffset, eofType);
rdbStore = currentReplicationStore.prepareRdb(replId, masterRdbOffset, eofType);

Check warning on line 100 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/AbstractReplicationStorePsync.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/AbstractReplicationStorePsync.java#L100

Added line #L100 was not covered by tests
inOutPayloadReplicationStore.setRdbStore(rdbStore);
super.beginReadRdb(eofType);
} catch (IOException e) {
Expand Down Expand Up @@ -129,38 +128,21 @@

@Override
public void onAux(String key, String value) {
//this part should be in AbstractPsync
if (REDIS_RDB_AUX_KEY_GTID.equalsIgnoreCase(key)) {
readRdbGtidSet(value);
}
}

@Override
public void onAuxFinish() {
public void onAuxFinish(Map<String, String> auxMap) {
getLogger().info("[onAuxFinish] aux is finish");

for (PsyncObserver observer : observers) {
try {
observer.readAuxEnd(rdbStore);
observer.readAuxEnd(rdbStore, auxMap);

Check warning on line 139 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/AbstractReplicationStorePsync.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/AbstractReplicationStorePsync.java#L139

Added line #L139 was not covered by tests
} catch (Throwable th) {
getLogger().error("[onAuxFinish]" + this, th);
}
}
}

protected void readRdbGtidSet(String gtidSet) {

getLogger().info("[readRdbGtidSet]{}, gtidset:{}", this, gtidSet);

for (PsyncObserver observer : observers) {
try {
observer.readRdbGtidSet(rdbStore, gtidSet);
} catch (Throwable th) {
getLogger().error("[readRdbGtidSet]" + this, th);
}
}
}

@Override
public void onFinish(RdbParser<?> parser) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

@Override
public ByteBuf getRequest() {
return new RequestStringParser(CONFIG, " get " + getConfigName()).format();
return new RequestStringParser(CONFIG, "get", getConfigName()).format();

Check warning on line 40 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java#L40

Added line #L40 was not covered by tests
}

protected abstract String getConfigName();
Expand Down Expand Up @@ -65,21 +65,26 @@
}
}

public static class ConfigGetDisklessSync extends ConfigGetCommand<Boolean>{
public static abstract class ConfigGetBool extends ConfigGetCommand<Boolean> {

public ConfigGetDisklessSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled) {
public ConfigGetBool(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled) {
super(clientPool, scheduled);
}

public ConfigGetDisklessSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled,
int commandTimeoutMilli) {
public ConfigGetBool(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled,
int commandTimeoutMilli) {
super(clientPool, scheduled, commandTimeoutMilli);
}

protected Boolean defaultValue() {
return null;

Check warning on line 80 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java#L80

Added line #L80 was not covered by tests
}

@Override
protected Boolean doFormat(Object[] payload) {

if(payload.length < 2){
if (null != defaultValue()) return defaultValue();
throw new IllegalStateException(getName() + " result length not right:" + payload.length);
}
String result = payloadToString(payload[1]);
Expand All @@ -89,9 +94,45 @@
if(result.equalsIgnoreCase("no")){
return false;
}
if (null != defaultValue()) return defaultValue();
throw new IllegalStateException("expected yes or no, but:" + result);
}

}

public static class ConfigGetRordbSync extends ConfigGetBool {

public ConfigGetRordbSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled) {
super(clientPool, scheduled);
}

Check warning on line 107 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java#L106-L107

Added lines #L106 - L107 were not covered by tests

public ConfigGetRordbSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled,
int commandTimeoutMilli) {
super(clientPool, scheduled, commandTimeoutMilli);
}

Check warning on line 112 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java#L111-L112

Added lines #L111 - L112 were not covered by tests

@Override
protected Boolean defaultValue() {
return false;

Check warning on line 116 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java#L116

Added line #L116 was not covered by tests
}

@Override
protected String getConfigName() {
return REDIS_CONFIG_TYPE.RORDB_SYNC.getConfigName();

Check warning on line 121 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java#L121

Added line #L121 was not covered by tests
}

}

public static class ConfigGetDisklessSync extends ConfigGetBool {

public ConfigGetDisklessSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled) {
super(clientPool, scheduled);
}

Check warning on line 130 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java#L129-L130

Added lines #L129 - L130 were not covered by tests

public ConfigGetDisklessSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled,
int commandTimeoutMilli) {
super(clientPool, scheduled, commandTimeoutMilli);
}

Check warning on line 135 in redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java

View check run for this annotation

Codecov / codecov/patch

redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/ConfigGetCommand.java#L134-L135

Added lines #L134 - L135 were not covered by tests

@Override
protected String getConfigName() {
Expand Down
Loading
Loading