Skip to content

Commit

Permalink
[refactor](binlog) put recording dropped resources into a separate me…
Browse files Browse the repository at this point in the history
…thod (#43938)

put recording dropped resources into a separate method
  • Loading branch information
w41ter committed Nov 19, 2024
1 parent 6ac54e3 commit 923486e
Showing 1 changed file with 57 additions and 53 deletions.
110 changes: 57 additions & 53 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,34 +121,7 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {

allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);

if (binlog.getType() == TBinlogType.DROP_PARTITION) {
DropPartitionInfo info = DropPartitionInfo.fromJson(binlog.data);
if (info != null && info.getPartitionId() > 0) {
droppedPartitions.add(Pair.of(info.getPartitionId(), binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.DROP_TABLE) {
DropTableRecord record = DropTableRecord.fromJson(binlog.data);
if (record != null && record.getTableId() > 0) {
droppedTables.add(Pair.of(record.getTableId(), binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.ALTER_JOB) {
AlterJobRecord record = AlterJobRecord.fromJson(binlog.data);
if (record != null && record.isSchemaChangeJob() && record.isJobFinished()) {
for (Long indexId : record.getOriginIndexIdList()) {
if (indexId != null && indexId > 0) {
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
}
}
}
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE) {
TruncateTableRecord record = TruncateTableRecord.fromJson(binlog.data);
if (record != null) {
for (long partitionId : record.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}
}
recordDroppedResources(binlog);

if (tableIds == null) {
return;
Expand Down Expand Up @@ -202,31 +175,7 @@ public void addBinlog(TBinlog binlog, Object raw) {
return;
}

if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
long tableId = ((DropTableRecord) raw).getTableId();
if (tableId > 0) {
droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) {
for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
if (indexId != null && indexId > 0) {
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
}
}
}
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw;
for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}
recordDroppedResources(binlog, raw);

switch (binlog.getType()) {
case CREATE_TABLE:
Expand Down Expand Up @@ -670,4 +619,59 @@ public void getBinlogInfo(BaseProcResult result) {
lock.readLock().unlock();
}
}

private void recordDroppedResources(TBinlog binlog) {
recordDroppedResources(binlog, null);
}

// A method to record the dropped tables, indexes, and partitions.
private void recordDroppedResources(TBinlog binlog, Object raw) {
if (raw == null) {
switch (binlog.getType()) {
case DROP_PARTITION:
raw = DropPartitionInfo.fromJson(binlog.data);
break;
case DROP_TABLE:
raw = DropTableRecord.fromJson(binlog.data);
break;
case ALTER_JOB:
raw = AlterJobRecord.fromJson(binlog.data);
break;
case TRUNCATE_TABLE:
raw = TruncateTableRecord.fromJson(binlog.data);
break;
default:
break;
}
if (raw == null) {
return;
}
}

if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.DROP_TABLE && raw instanceof DropTableRecord) {
long tableId = ((DropTableRecord) raw).getTableId();
if (tableId > 0) {
droppedTables.add(Pair.of(tableId, binlog.getCommitSeq()));
}
} else if (binlog.getType() == TBinlogType.ALTER_JOB && raw instanceof AlterJobRecord) {
AlterJobRecord alterJobRecord = (AlterJobRecord) raw;
if (alterJobRecord.isJobFinished() && alterJobRecord.isSchemaChangeJob()) {
for (Long indexId : alterJobRecord.getOriginIndexIdList()) {
if (indexId != null && indexId > 0) {
droppedIndexes.add(Pair.of(indexId, binlog.getCommitSeq()));
}
}
}
} else if (binlog.getType() == TBinlogType.TRUNCATE_TABLE && raw instanceof TruncateTableRecord) {
TruncateTableRecord truncateTableRecord = (TruncateTableRecord) raw;
for (long partitionId : truncateTableRecord.getOldPartitionIds()) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}
}
}

0 comments on commit 923486e

Please sign in to comment.