From b8a337468c3063854e0df43bfb01e93e214fc3c2 Mon Sep 17 00:00:00 2001 From: yujun Date: Wed, 17 Jan 2024 09:02:37 +0800 Subject: [PATCH] [feature](merge-cloud) Add drop cloud table (#30032) Co-authored-by: Luwei <814383175@qq.com> Co-authored-by: deardeng <565620795@qq.com> Co-authored-by: Gavin Chou Co-authored-by: Lightman <31928846+Lchangliang@users.noreply.github.com> Co-authored-by: zhengyu Co-authored-by: Lei Zhang <1091517373@qq.com> Co-authored-by: AlexYue Co-authored-by: Xiaocc <598887962@qq.com> Co-authored-by: panDing19 <56944854+panDing19@users.noreply.github.com> Co-authored-by: plat1ko Co-authored-by: zhangdong <493738387@qq.com> Co-authored-by: walter --- .../java/org/apache/doris/common/Config.java | 3 + .../java/org/apache/doris/catalog/Env.java | 29 +--- .../apache/doris/catalog/TempPartitions.java | 8 +- .../apache/doris/cloud/catalog/CloudEnv.java | 5 + .../datasource/CloudInternalCatalog.java | 163 +++++++++++++++++- .../doris/datasource/InternalCatalog.java | 58 ++++++- 6 files changed, 228 insertions(+), 38 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 2a8a2ed53adc8d..baf46177bc1035 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2463,6 +2463,9 @@ public static boolean isNotCloudMode() { @ConfField(mutable = true) public static int sts_duration = 3600; + @ConfField(mutable = true) + public static int drop_rpc_retry_num = 200; + @ConfField public static int cloud_meta_service_rpc_failed_retry_times = 200; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index b72544365aa38a..222eb195b3480c 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -251,7 +251,6 @@ import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.CompactionTask; -import org.apache.doris.task.DropReplicaTask; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.task.PriorityMasterTaskExecutor; import org.apache.doris.thrift.BackendService; @@ -5684,33 +5683,11 @@ public void onEraseOlapTable(OlapTable olapTable, boolean isReplay) { } } - if (!isReplay && !Env.isCheckpointThread()) { - // drop all replicas - AgentBatchTask batchTask = new AgentBatchTask(); - for (Partition partition : olapTable.getAllPartitions()) { - List allIndices = partition.getMaterializedIndices(IndexExtState.ALL); - for (MaterializedIndex materializedIndex : allIndices) { - long indexId = materializedIndex.getId(); - int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - for (Tablet tablet : materializedIndex.getTablets()) { - long tabletId = tablet.getId(); - List replicas = tablet.getReplicas(); - for (Replica replica : replicas) { - long backendId = replica.getBackendId(); - long replicaId = replica.getId(); - DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, - replicaId, schemaHash, true); - batchTask.addTask(dropTask); - } // end for replicas - } // end for tablets - } // end for indices - } // end for partitions - AgentTaskExecutor.submit(batchTask); - } - // TODO: does checkpoint need update colocate index ? // colocation Env.getCurrentColocateIndex().removeTable(olapTable.getId()); + + getInternalCatalog().eraseTableDropBackendReplicas(olapTable, isReplay); } public void onErasePartition(Partition partition) { @@ -5721,6 +5698,8 @@ public void onErasePartition(Partition partition) { invertedIndex.deleteTablet(tablet.getId()); } } + + getInternalCatalog().erasePartitionDropBackendReplicas(Lists.newArrayList(partition)); } public void cleanTrash(AdminCleanTrashStmt stmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java index 9cd2d61bf91bd1..aebcae2e51d5ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java @@ -17,7 +17,6 @@ package org.apache.doris.catalog; -import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -78,12 +77,7 @@ public void dropPartition(String partitionName, boolean needDropTablet) { idToPartition.remove(partition.getId()); nameToPartition.remove(partitionName); if (needDropTablet) { - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { - for (Tablet tablet : index.getTablets()) { - invertedIndex.deleteTablet(tablet.getId()); - } - } + Env.getCurrentEnv().onErasePartition(partition); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java index 8d49783f3ca2c0..85162fec825246 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java @@ -19,8 +19,13 @@ import org.apache.doris.catalog.Env; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + public class CloudEnv extends Env { + private static final Logger LOG = LogManager.getLogger(CloudEnv.class); + public CloudEnv(boolean isCheckpointCatalog) { super(isCheckpointCatalog); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 13f04fd36df825..f2e3f10c2800fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -27,9 +27,11 @@ import org.apache.doris.catalog.Index; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.MaterializedIndex.IndexState; import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Replica.ReplicaState; @@ -54,6 +56,7 @@ import org.apache.doris.thrift.TTabletType; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import doris.segment_v2.SegmentV2; import org.apache.logging.log4j.LogManager; @@ -498,12 +501,164 @@ private void sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder requestBuil // END CREATE TABLE + // BEGIN DROP TABLE + + @Override + public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean isReplay) { + if (!Env.getCurrentEnv().isMaster()) { + return; + } + + List indexs = Lists.newArrayList(); + for (Partition partition : olapTable.getAllPartitions()) { + List allIndices = partition.getMaterializedIndices(IndexExtState.ALL); + for (MaterializedIndex materializedIndex : allIndices) { + long indexId = materializedIndex.getId(); + indexs.add(indexId); + } + } + + int tryCnt = 0; + while (true) { + if (tryCnt++ > Config.drop_rpc_retry_num) { + LOG.warn("failed to drop index {} of table {}, try cnt {} reaches maximum retry count", + indexs, olapTable.getId(), tryCnt); + break; + } + + try { + if (indexs.isEmpty()) { + break; + } + dropMaterializedIndex(olapTable.getId(), indexs); + } catch (Exception e) { + LOG.warn("failed to drop index {} of table {}, try cnt {}, execption {}", + indexs, olapTable.getId(), tryCnt, e); + try { + Thread.sleep(3000); + } catch (InterruptedException ie) { + LOG.warn("Thread sleep is interrupted"); + } + continue; + } + break; + } + } + + @Override + public void erasePartitionDropBackendReplicas(List partitions) { + if (!Env.getCurrentEnv().isMaster() || partitions.isEmpty()) { + return; + } + + long tableId = -1; + List partitionIds = Lists.newArrayList(); + List indexIds = Lists.newArrayList(); + for (Partition partition : partitions) { + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { + indexIds.add(index.getId()); + if (tableId == -1) { + tableId = ((CloudReplica) index.getTablets().get(0).getReplicas().get(0)).getTableId(); + } + } + partitionIds.add(partition.getId()); + } + + CloudPartition partition0 = (CloudPartition) partitions.get(0); + + int tryCnt = 0; + while (true) { + if (tryCnt++ > Config.drop_rpc_retry_num) { + LOG.warn("failed to drop partition {} of table {}, try cnt {} reaches maximum retry count", + partitionIds, tableId, tryCnt); + break; + } + try { + dropCloudPartition(partition0.getDbId(), tableId, partitionIds, indexIds); + } catch (Exception e) { + LOG.warn("failed to drop partition {} of table {}, try cnt {}, execption {}", + partitionIds, tableId, tryCnt, e); + try { + Thread.sleep(3000); + } catch (InterruptedException ie) { + LOG.warn("Thread sleep is interrupted"); + } + continue; + } + break; + } + } + + private void dropCloudPartition(long dbId, long tableId, List partitionIds, List indexIds) + throws DdlException { + Cloud.PartitionRequest.Builder partitionRequestBuilder = + Cloud.PartitionRequest.newBuilder(); + partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); + partitionRequestBuilder.setTableId(tableId); + partitionRequestBuilder.addAllPartitionIds(partitionIds); + partitionRequestBuilder.addAllIndexIds(indexIds); + if (dbId > 0) { + partitionRequestBuilder.setDbId(dbId); + } + final Cloud.PartitionRequest partitionRequest = partitionRequestBuilder.build(); + + Cloud.PartitionResponse response = null; + int tryTimes = 0; + while (tryTimes++ < Config.meta_service_rpc_retry_times) { + try { + response = MetaServiceProxy.getInstance().dropPartition(partitionRequest); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + } catch (RpcException e) { + LOG.warn("tryTimes:{}, dropPartition RpcException", tryTimes, e); + if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) { + throw new DdlException(e.getMessage()); + } + } + sleepSeveralMs(); + } + + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("dropPartition response: {} ", response); + throw new DdlException(response.getStatus().getMsg()); + } + } + + private void dropMaterializedIndex(Long tableId, List indexIds) throws DdlException { + Cloud.IndexRequest.Builder indexRequestBuilder = Cloud.IndexRequest.newBuilder(); + indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); + indexRequestBuilder.addAllIndexIds(indexIds); + indexRequestBuilder.setTableId(tableId); + final Cloud.IndexRequest indexRequest = indexRequestBuilder.build(); + + Cloud.IndexResponse response = null; + int tryTimes = 0; + while (tryTimes++ < Config.meta_service_rpc_retry_times) { + try { + response = MetaServiceProxy.getInstance().dropIndex(indexRequest); + if (response.getStatus().getCode() != Cloud.MetaServiceCode.KV_TXN_CONFLICT) { + break; + } + } catch (RpcException e) { + LOG.warn("tryTimes:{}, dropIndex RpcException", tryTimes, e); + if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) { + throw new DdlException(e.getMessage()); + } + } + sleepSeveralMs(); + } + + if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { + LOG.warn("dropIndex response: {} ", response); + throw new DdlException(response.getStatus().getMsg()); + } + } + + // END DROP TABLE + @Override protected void checkAvailableCapacity(Database db) throws DdlException { - // check cluster capacity - Env.getCurrentSystemInfo().checkAvailableCapacity(); - // check db quota - db.checkQuota(); } private void sleepSeveralMs() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 4671daba0cb10f..cf270a2c091684 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -156,6 +156,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.task.DropReplicaTask; import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStorageFormat; @@ -965,6 +966,38 @@ public void replayRecoverTable(RecoverInfo info) throws MetaNotFoundException, D } } + public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean isReplay) { + if (isReplay || Env.isCheckpointThread()) { + return; + } + + // drop all replicas + AgentBatchTask batchTask = new AgentBatchTask(); + for (Partition partition : olapTable.getAllPartitions()) { + List allIndices = partition.getMaterializedIndices(IndexExtState.ALL); + for (MaterializedIndex materializedIndex : allIndices) { + long indexId = materializedIndex.getId(); + int schemaHash = olapTable.getSchemaHashByIndexId(indexId); + for (Tablet tablet : materializedIndex.getTablets()) { + long tabletId = tablet.getId(); + List replicas = tablet.getReplicas(); + for (Replica replica : replicas) { + long backendId = replica.getBackendId(); + long replicaId = replica.getId(); + DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId, + replicaId, schemaHash, true); + batchTask.addTask(dropTask); + } // end for replicas + } // end for tablets + } // end for indices + } // end for partitions + AgentTaskExecutor.submit(batchTask); + } + + public void erasePartitionDropBackendReplicas(List partitions) { + // no need send be delete task, when be report its tablets, fe will send delete task then. + } + private void unprotectAddReplica(OlapTable olapTable, ReplicaPersistInfo info) { LOG.debug("replay add a replica {}", info); Partition partition = olapTable.getPartition(info.getPartitionId()); @@ -3085,6 +3118,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti // Things may be changed outside the table lock. olapTable = (OlapTable) db.getTableOrDdlException(copiedTbl.getId()); olapTable.writeLockOrDdlException(); + List oldPartitions = Lists.newArrayList(); try { olapTable.checkNormalStateForAlter(); // check partitions @@ -3141,7 +3175,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } // replace - truncateTableInternal(olapTable, newPartitions, truncateEntireTable); + oldPartitions = truncateTableInternal(olapTable, newPartitions, truncateEntireTable); // write edit log TruncateTableInfo info = @@ -3152,6 +3186,9 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } finally { olapTable.writeUnlock(); } + + erasePartitionDropBackendReplicas(oldPartitions); + if (truncateEntireTable) { // Drop the whole table stats after truncate the entire table Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable); @@ -3162,11 +3199,14 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames()); } - private void truncateTableInternal(OlapTable olapTable, List newPartitions, boolean isEntireTable) { + private List truncateTableInternal(OlapTable olapTable, List newPartitions, + boolean isEntireTable) { // use new partitions to replace the old ones. + List oldPartitions = Lists.newArrayList(); Set oldTabletIds = Sets.newHashSet(); for (Partition newPartition : newPartitions) { Partition oldPartition = olapTable.replacePartition(newPartition); + oldPartitions.add(oldPartition); // save old tablets to be removed for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) { index.getTablets().forEach(t -> { @@ -3176,6 +3216,12 @@ private void truncateTableInternal(OlapTable olapTable, List newParti } if (isEntireTable) { + Set oldPartitionsIds = oldPartitions.stream().map(Partition::getId).collect(Collectors.toSet()); + for (Partition partition : olapTable.getTempPartitions()) { + if (!oldPartitionsIds.contains(partition.getId())) { + oldPartitions.add(partition); + } + } // drop all temp partitions olapTable.dropAllTempPartitions(); } @@ -3184,9 +3230,12 @@ private void truncateTableInternal(OlapTable olapTable, List newParti for (Long tabletId : oldTabletIds) { Env.getCurrentInvertedIndex().deleteTablet(tabletId); } + + return oldPartitions; } public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException { + List oldPartitions = Lists.newArrayList(); Database db = (Database) getDbOrMetaException(info.getDbId()); OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTblId(), TableType.OLAP); olapTable.writeLock(); @@ -3196,6 +3245,7 @@ public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundExcep // add tablet to inverted index TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Partition partition : info.getPartitions()) { + oldPartitions.add(partition); long partitionId = partition.getId(); TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId) .getStorageMedium(); @@ -3216,6 +3266,10 @@ public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundExcep } finally { olapTable.writeUnlock(); } + + if (!Env.isCheckpointThread()) { + erasePartitionDropBackendReplicas(oldPartitions); + } } public void replayAlterExternalTableSchema(String dbName, String tableName, List newSchema)