Skip to content

Commit

Permalink
keeper support fsync with rordb (#772)
Browse files Browse the repository at this point in the history
Co-authored-by: lishanglin <[email protected]>
  • Loading branch information
LanternLee and lishanglin authored Mar 7, 2024
1 parent aa41c04 commit fe23abd
Show file tree
Hide file tree
Showing 93 changed files with 1,641 additions and 678 deletions.
76 changes: 76 additions & 0 deletions core/src/main/java/com/ctrip/xpipe/cache/TimeBoundCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.ctrip.xpipe.cache;

import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* @author lishanglin
* date 2021/4/17
*/
public class TimeBoundCache<T> {

private T data;

private long lastRefreshAt;

private long expiredAt;

private LongSupplier timeoutMillSupplier;

private Supplier<T> dataSupplier;

public TimeBoundCache(LongSupplier timeoutMillSupplier, Supplier<T> dataSupplier) {
this.data = null;
this.expiredAt = 0L;
this.lastRefreshAt = 0L;
this.timeoutMillSupplier = timeoutMillSupplier;
this.dataSupplier = dataSupplier;
}

// allow data timeout
public T getCurrentData() {
return this.data;
}

public T getData() {
return getData(false);
}

public T getData(boolean disableCache) {
long current = System.currentTimeMillis();
if (current < lastRefreshAt) {
// system time roll back
resetExpireAt();
}

if (!disableCache && null != data && expiredAt > current) {
return data;
}

synchronized (this) {
if (!disableCache && null != data && expiredAt > current) return data;
data = dataSupplier.get();
refreshExpireAt();
return data;
}
}

public synchronized void refresh() {
this.data = dataSupplier.get();
refreshExpireAt();
}

private void resetExpireAt() {
this.lastRefreshAt = 0L;
this.expiredAt = 0L;
}

private void refreshExpireAt() {
long timeout = timeoutMillSupplier.getAsLong();
this.lastRefreshAt = System.currentTimeMillis();
this.expiredAt = lastRefreshAt + timeout;
// expiredAt exceeds max long
if (this.expiredAt < timeout) this.expiredAt = Long.MAX_VALUE;
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.ctrip.xpipe.payload;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.ctrip.xpipe.redis.console.cache.impl;

import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.cache.DcCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.model.DcTbl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.ctrip.xpipe.config.AbstractConfigBean;
import com.ctrip.xpipe.redis.checker.alert.AlertDbConfig;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.config.ConsoleDbConfig;
import com.ctrip.xpipe.redis.console.model.ConfigModel;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.ctrip.xpipe.redis.console.controller.api.migrate;

import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.controller.AbstractConsoleController;
import com.ctrip.xpipe.redis.console.controller.api.migrate.meta.MigrationProgress;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.ctrip.xpipe.redis.console.resources;

import com.ctrip.xpipe.redis.checker.PersistenceCache;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.utils.VisibleForTesting;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.ctrip.xpipe.api.monitor.Task;
import com.ctrip.xpipe.api.monitor.TransactionMonitor;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.cluster.ConsoleLeaderAware;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.exception.DataNotFoundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import com.ctrip.xpipe.api.migration.OuterClientService;
import com.ctrip.xpipe.lifecycle.AbstractLifecycle;
import com.ctrip.xpipe.redis.checker.OuterClientCache;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.ctrip.xpipe.redis.console.sentinel.impl;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.model.SentinelGroupModel;
import com.ctrip.xpipe.redis.console.sentinel.SentinelBalanceService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
public enum CAPA {

EOF,
PSYNC2;
PSYNC2,
RORDB;

public static CAPA of(String capaString) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.redis.core.store.RdbStore;

import java.io.IOException;
import java.util.Map;

/**
* @author wenchao.meng
Expand All @@ -28,9 +29,7 @@ public interface PsyncObserver {
*/
void beginWriteRdb(EofType eofType, String replId, long masterRdbOffset) throws IOException;

void readRdbGtidSet(RdbStore rdbStore, String gtidSet);

void readAuxEnd(RdbStore rdbStore);
void readAuxEnd(RdbStore rdbStore, Map<String, String> auxMap);

void endWriteRdb();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public static enum REDIS_CONFIG_TYPE{
DISKLESS_SYNC_DELAY("repl-diskless-sync-delay"),
SLAVE_READONLY("slave-read-only"),

SLAVE_REPL_ALL("slave-repl-all")//extend for xredis
SLAVE_REPL_ALL("slave-repl-all"), //extend for xredis
RORDB_SYNC("swap-repl-rordb-sync") // extend for ror
;

private String configName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import static com.ctrip.xpipe.redis.core.redis.rdb.RdbConstant.REDIS_RDB_AUX_KEY_GTID;

/**
* @author marsqing
*
Expand Down Expand Up @@ -98,7 +97,7 @@ protected RdbBulkStringParser createRdbReader() {
@Override
protected void beginReadRdb(EofType eofType) {
try {
rdbStore = currentReplicationStore.beginRdb(replId, masterRdbOffset, eofType);
rdbStore = currentReplicationStore.prepareRdb(replId, masterRdbOffset, eofType);
inOutPayloadReplicationStore.setRdbStore(rdbStore);
super.beginReadRdb(eofType);
} catch (IOException e) {
Expand Down Expand Up @@ -129,38 +128,21 @@ public void onRedisOp(RedisOp redisOp) {

@Override
public void onAux(String key, String value) {
//this part should be in AbstractPsync
if (REDIS_RDB_AUX_KEY_GTID.equalsIgnoreCase(key)) {
readRdbGtidSet(value);
}
}

@Override
public void onAuxFinish() {
public void onAuxFinish(Map<String, String> auxMap) {
getLogger().info("[onAuxFinish] aux is finish");

for (PsyncObserver observer : observers) {
try {
observer.readAuxEnd(rdbStore);
observer.readAuxEnd(rdbStore, auxMap);
} catch (Throwable th) {
getLogger().error("[onAuxFinish]" + this, th);
}
}
}

protected void readRdbGtidSet(String gtidSet) {

getLogger().info("[readRdbGtidSet]{}, gtidset:{}", this, gtidSet);

for (PsyncObserver observer : observers) {
try {
observer.readRdbGtidSet(rdbStore, gtidSet);
} catch (Throwable th) {
getLogger().error("[readRdbGtidSet]" + this, th);
}
}
}

@Override
public void onFinish(RdbParser<?> parser) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected T format(Object payload) {

@Override
public ByteBuf getRequest() {
return new RequestStringParser(CONFIG, " get " + getConfigName()).format();
return new RequestStringParser(CONFIG, "get", getConfigName()).format();
}

protected abstract String getConfigName();
Expand Down Expand Up @@ -65,21 +65,26 @@ protected String getConfigName() {
}
}

public static class ConfigGetDisklessSync extends ConfigGetCommand<Boolean>{
public static abstract class ConfigGetBool extends ConfigGetCommand<Boolean> {

public ConfigGetDisklessSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled) {
public ConfigGetBool(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled) {
super(clientPool, scheduled);
}

public ConfigGetDisklessSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled,
int commandTimeoutMilli) {
public ConfigGetBool(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled,
int commandTimeoutMilli) {
super(clientPool, scheduled, commandTimeoutMilli);
}

protected Boolean defaultValue() {
return null;
}

@Override
protected Boolean doFormat(Object[] payload) {

if(payload.length < 2){
if (null != defaultValue()) return defaultValue();
throw new IllegalStateException(getName() + " result length not right:" + payload.length);
}
String result = payloadToString(payload[1]);
Expand All @@ -89,9 +94,45 @@ protected Boolean doFormat(Object[] payload) {
if(result.equalsIgnoreCase("no")){
return false;
}
if (null != defaultValue()) return defaultValue();
throw new IllegalStateException("expected yes or no, but:" + result);
}

}

public static class ConfigGetRordbSync extends ConfigGetBool {

public ConfigGetRordbSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled) {
super(clientPool, scheduled);
}

public ConfigGetRordbSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled,
int commandTimeoutMilli) {
super(clientPool, scheduled, commandTimeoutMilli);
}

@Override
protected Boolean defaultValue() {
return false;
}

@Override
protected String getConfigName() {
return REDIS_CONFIG_TYPE.RORDB_SYNC.getConfigName();
}

}

public static class ConfigGetDisklessSync extends ConfigGetBool {

public ConfigGetDisklessSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled) {
super(clientPool, scheduled);
}

public ConfigGetDisklessSync(SimpleObjectPool<NettyClient> clientPool, ScheduledExecutorService scheduled,
int commandTimeoutMilli) {
super(clientPool, scheduled, commandTimeoutMilli);
}

@Override
protected String getConfigName() {
Expand Down
Loading

0 comments on commit fe23abd

Please sign in to comment.