Skip to content

Commit

Permalink
[feature](merge-cloud) Add drop cloud table (apache#30032)
Browse files Browse the repository at this point in the history
Co-authored-by: Luwei <[email protected]>
Co-authored-by: deardeng <[email protected]>
Co-authored-by: Gavin Chou <[email protected]>
Co-authored-by: Lightman <[email protected]>
Co-authored-by: zhengyu <[email protected]>
Co-authored-by: Lei Zhang <[email protected]>
Co-authored-by: AlexYue <[email protected]>
Co-authored-by: Xiaocc <[email protected]>
Co-authored-by: panDing19 <[email protected]>
Co-authored-by: plat1ko <[email protected]>
Co-authored-by: zhangdong <[email protected]>
Co-authored-by: walter <[email protected]>
  • Loading branch information
13 people authored Jan 17, 2024
1 parent 33702aa commit b8a3374
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2463,6 +2463,9 @@ public static boolean isNotCloudMode() {
@ConfField(mutable = true)
public static int sts_duration = 3600;

@ConfField(mutable = true)
public static int drop_rpc_retry_num = 200;

@ConfField
public static int cloud_meta_service_rpc_failed_retry_times = 200;

Expand Down
29 changes: 4 additions & 25 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.CompactionTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PriorityMasterTaskExecutor;
import org.apache.doris.thrift.BackendService;
Expand Down Expand Up @@ -5684,33 +5683,11 @@ public void onEraseOlapTable(OlapTable olapTable, boolean isReplay) {
}
}

if (!isReplay && !Env.isCheckpointThread()) {
// drop all replicas
AgentBatchTask batchTask = new AgentBatchTask();
for (Partition partition : olapTable.getAllPartitions()) {
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
for (MaterializedIndex materializedIndex : allIndices) {
long indexId = materializedIndex.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : materializedIndex.getTablets()) {
long tabletId = tablet.getId();
List<Replica> replicas = tablet.getReplicas();
for (Replica replica : replicas) {
long backendId = replica.getBackendId();
long replicaId = replica.getId();
DropReplicaTask dropTask = new DropReplicaTask(backendId, tabletId,
replicaId, schemaHash, true);
batchTask.addTask(dropTask);
} // end for replicas
} // end for tablets
} // end for indices
} // end for partitions
AgentTaskExecutor.submit(batchTask);
}

// TODO: does checkpoint need update colocate index ?
// colocation
Env.getCurrentColocateIndex().removeTable(olapTable.getId());

getInternalCatalog().eraseTableDropBackendReplicas(olapTable, isReplay);
}

public void onErasePartition(Partition partition) {
Expand All @@ -5721,6 +5698,8 @@ public void onErasePartition(Partition partition) {
invertedIndex.deleteTablet(tablet.getId());
}
}

getInternalCatalog().erasePartitionDropBackendReplicas(Lists.newArrayList(partition));
}

public void cleanTrash(AdminCleanTrashStmt stmt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.catalog;

import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
Expand Down Expand Up @@ -78,12 +77,7 @@ public void dropPartition(String partitionName, boolean needDropTablet) {
idToPartition.remove(partition.getId());
nameToPartition.remove(partitionName);
if (needDropTablet) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
Env.getCurrentEnv().onErasePartition(partition);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

import org.apache.doris.catalog.Env;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class CloudEnv extends Env {

private static final Logger LOG = LogManager.getLogger(CloudEnv.class);

public CloudEnv(boolean isCheckpointCatalog) {
super(isCheckpointCatalog);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
Expand All @@ -54,6 +56,7 @@
import org.apache.doris.thrift.TTabletType;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import doris.segment_v2.SegmentV2;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -498,12 +501,164 @@ private void sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder requestBuil

// END CREATE TABLE

// BEGIN DROP TABLE

@Override
public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean isReplay) {
if (!Env.getCurrentEnv().isMaster()) {
return;
}

List<Long> indexs = Lists.newArrayList();
for (Partition partition : olapTable.getAllPartitions()) {
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(IndexExtState.ALL);
for (MaterializedIndex materializedIndex : allIndices) {
long indexId = materializedIndex.getId();
indexs.add(indexId);
}
}

int tryCnt = 0;
while (true) {
if (tryCnt++ > Config.drop_rpc_retry_num) {
LOG.warn("failed to drop index {} of table {}, try cnt {} reaches maximum retry count",
indexs, olapTable.getId(), tryCnt);
break;
}

try {
if (indexs.isEmpty()) {
break;
}
dropMaterializedIndex(olapTable.getId(), indexs);
} catch (Exception e) {
LOG.warn("failed to drop index {} of table {}, try cnt {}, execption {}",
indexs, olapTable.getId(), tryCnt, e);
try {
Thread.sleep(3000);
} catch (InterruptedException ie) {
LOG.warn("Thread sleep is interrupted");
}
continue;
}
break;
}
}

@Override
public void erasePartitionDropBackendReplicas(List<Partition> partitions) {
if (!Env.getCurrentEnv().isMaster() || partitions.isEmpty()) {
return;
}

long tableId = -1;
List<Long> partitionIds = Lists.newArrayList();
List<Long> indexIds = Lists.newArrayList();
for (Partition partition : partitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
indexIds.add(index.getId());
if (tableId == -1) {
tableId = ((CloudReplica) index.getTablets().get(0).getReplicas().get(0)).getTableId();
}
}
partitionIds.add(partition.getId());
}

CloudPartition partition0 = (CloudPartition) partitions.get(0);

int tryCnt = 0;
while (true) {
if (tryCnt++ > Config.drop_rpc_retry_num) {
LOG.warn("failed to drop partition {} of table {}, try cnt {} reaches maximum retry count",
partitionIds, tableId, tryCnt);
break;
}
try {
dropCloudPartition(partition0.getDbId(), tableId, partitionIds, indexIds);
} catch (Exception e) {
LOG.warn("failed to drop partition {} of table {}, try cnt {}, execption {}",
partitionIds, tableId, tryCnt, e);
try {
Thread.sleep(3000);
} catch (InterruptedException ie) {
LOG.warn("Thread sleep is interrupted");
}
continue;
}
break;
}
}

private void dropCloudPartition(long dbId, long tableId, List<Long> partitionIds, List<Long> indexIds)
throws DdlException {
Cloud.PartitionRequest.Builder partitionRequestBuilder =
Cloud.PartitionRequest.newBuilder();
partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
partitionRequestBuilder.setTableId(tableId);
partitionRequestBuilder.addAllPartitionIds(partitionIds);
partitionRequestBuilder.addAllIndexIds(indexIds);
if (dbId > 0) {
partitionRequestBuilder.setDbId(dbId);
}
final Cloud.PartitionRequest partitionRequest = partitionRequestBuilder.build();

Cloud.PartitionResponse response = null;
int tryTimes = 0;
while (tryTimes++ < Config.meta_service_rpc_retry_times) {
try {
response = MetaServiceProxy.getInstance().dropPartition(partitionRequest);
if (response.getStatus().getCode() != Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
break;
}
} catch (RpcException e) {
LOG.warn("tryTimes:{}, dropPartition RpcException", tryTimes, e);
if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
throw new DdlException(e.getMessage());
}
}
sleepSeveralMs();
}

if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("dropPartition response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
}

private void dropMaterializedIndex(Long tableId, List<Long> indexIds) throws DdlException {
Cloud.IndexRequest.Builder indexRequestBuilder = Cloud.IndexRequest.newBuilder();
indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
indexRequestBuilder.addAllIndexIds(indexIds);
indexRequestBuilder.setTableId(tableId);
final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();

Cloud.IndexResponse response = null;
int tryTimes = 0;
while (tryTimes++ < Config.meta_service_rpc_retry_times) {
try {
response = MetaServiceProxy.getInstance().dropIndex(indexRequest);
if (response.getStatus().getCode() != Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
break;
}
} catch (RpcException e) {
LOG.warn("tryTimes:{}, dropIndex RpcException", tryTimes, e);
if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
throw new DdlException(e.getMessage());
}
}
sleepSeveralMs();
}

if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
LOG.warn("dropIndex response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
}

// END DROP TABLE

@Override
protected void checkAvailableCapacity(Database db) throws DdlException {
// check cluster capacity
Env.getCurrentSystemInfo().checkAvailableCapacity();
// check db quota
db.checkQuota();
}

private void sleepSeveralMs() {
Expand Down
Loading

0 comments on commit b8a3374

Please sign in to comment.