Skip to content

Commit

Permalink
[feature](binlog) Wrap rename table/column binlog in BarrierLog (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Nov 4, 2024
1 parent d1e63c5 commit 3ea64ad
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
Expand Down Expand Up @@ -355,6 +357,24 @@ public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long prevCommit
}
}

public void addTableRename(TableInfo info, long commitSeq) {
long dbId = info.getDbId();
long tableId = info.getTableId();
TBinlogType type = TBinlogType.RENAME_TABLE;
String data = info.toJson();
BarrierLog log = new BarrierLog(dbId, tableId, type, data);
addBarrierLog(log, commitSeq);
}

public void addColumnRename(TableRenameColumnInfo info, long commitSeq) {
long dbId = info.getDbId();
long tableId = info.getTableId();
TBinlogType type = TBinlogType.RENAME_COLUMN;
String data = info.toJson();
BarrierLog log = new BarrierLog(dbId, tableId, type, data);
addBarrierLog(log, commitSeq);
}

// get the dropped partitions of the db.
public List<Long> getDroppedPartitions(long dbId) {
lock.readLock().lock();
Expand Down
10 changes: 8 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_RENAME_TABLE: {
TableInfo info = (TableInfo) journal.getData();
env.replayRenameTable(info);
env.getBinlogManager().addTableRename(info, logId);
break;
}
case OperationType.OP_MODIFY_VIEW_DEF: {
Expand All @@ -317,6 +318,7 @@ public static void loadJournal(Env env, Long logId, JournalEntity journal) {
case OperationType.OP_RENAME_COLUMN: {
TableRenameColumnInfo info = (TableRenameColumnInfo) journal.getData();
env.replayRenameColumn(info);
Env.getCurrentEnv().getBinlogManager().addColumnRename(info, logId);
break;
}
case OperationType.OP_BACKUP_JOB: {
Expand Down Expand Up @@ -1521,7 +1523,9 @@ public void logDatabaseRename(DatabaseInfo databaseInfo) {
}

public void logTableRename(TableInfo tableInfo) {
logEdit(OperationType.OP_RENAME_TABLE, tableInfo);
long logId = logEdit(OperationType.OP_RENAME_TABLE, tableInfo);
LOG.info("log table rename, logId : {}, infos: {}", logId, tableInfo);
Env.getCurrentEnv().getBinlogManager().addTableRename(tableInfo, logId);
}

public void logModifyViewDef(AlterViewInfo alterViewInfo) {
Expand All @@ -1537,7 +1541,9 @@ public void logPartitionRename(TableInfo tableInfo) {
}

public void logColumnRename(TableRenameColumnInfo info) {
logEdit(OperationType.OP_RENAME_COLUMN, info);
long logId = logEdit(OperationType.OP_RENAME_COLUMN, info);
LOG.info("log column rename, logId : {}, infos: {}", logId, info);
Env.getCurrentEnv().getBinlogManager().addColumnRename(info, logId);
}

public void logAddBroker(BrokerMgr.ModifyBrokerInfo info) {
Expand Down
18 changes: 18 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/persist/TableInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,30 @@

import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TableInfo implements Writable {

@SerializedName("db")
private long dbId;
@SerializedName("tb")
private long tableId;
@SerializedName("ind")
private long indexId;
@SerializedName("p")
private long partitionId;

@SerializedName("nT")
private String newTableName;
@SerializedName("nR")
private String newRollupName;
@SerializedName("nP")
private String newPartitionName;

public TableInfo() {
Expand Down Expand Up @@ -124,4 +134,12 @@ public static TableInfo read(DataInput in) throws IOException {
info.readFields(in);
return info;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

public static TableInfo fromJson(String json) {
return GsonUtils.GSON.fromJson(json, TableInfo.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public Map<Long, Integer> getIndexIdToSchemaVersion() {
return indexIdToSchemaVersion;
}

public String toJson() {
return GsonUtils.GSON.toJson(this);
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,8 @@ enum TBinlogType {
MODIFY_PARTITIONS = 11,
REPLACE_PARTITIONS = 12,
TRUNCATE_TABLE = 13,
RENAME_TABLE = 14,
RENAME_COLUMN = 15,
}

struct TBinlog {
Expand Down

0 comments on commit 3ea64ad

Please sign in to comment.