From 81e8182a44ec656f7b3d891ae97e5fe68fbf7d33 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Thu, 2 Jan 2025 22:04:35 +0800 Subject: [PATCH] [fix](cloud-mow) FE should release delete bitmap lock when calculating delete bitmap failed (#45673) When calculating delete bitmap failed, delete bitmap lock is holded by last failed txn, fe should release lock for other txn can do commit. --- .../CloudGlobalTransactionMgr.java | 221 +++++++++--------- .../DeleteBitmapUpdateLockContext.java | 82 +++++++ ...cloud_mow_stream_load_with_commit_fail.out | 7 + ...ud_mow_stream_load_with_commit_fail.groovy | 142 +++++++++++ 4 files changed, 346 insertions(+), 106 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index b9425245f421b2..ffe32348bd0915 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -346,7 +346,29 @@ public void commitTransaction(long dbId, List tableList, public void commitTransaction(long dbId, List
tableList, long transactionId, List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); + List mowTableList = getMowTableList(tableList, tabletCommitInfos); + try { + LOG.info("try to commit transaction, transactionId: {}", transactionId); + Map> backendToPartitionInfos = null; + if (!mowTableList.isEmpty()) { + DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext(); + getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext); + if (lockContext.getBackendToPartitionTablets().isEmpty()) { + throw new UserException( + "The partition info is empty, table may be dropped, txnid=" + transactionId); + } + backendToPartitionInfos = getCalcDeleteBitmapInfo(lockContext, null); + } + commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false, + mowTableList, backendToPartitionInfos); + } catch (Exception e) { + if (!mowTableList.isEmpty()) { + LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId, + e.getMessage()); + removeDeleteBitmapUpdateLock(mowTableList, transactionId); + } + throw e; + } } /** @@ -464,17 +486,15 @@ private Set getBaseTabletsFromTables(List
tableList, List tableList, long transactionId, - List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC) + private void commitTransactionWithoutLock(long dbId, List
tableList, long transactionId, + List tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC, + List mowTableList, Map> backendToPartitionInfos) throws UserException { - - LOG.info("try to commit transaction, transactionId: {}", transactionId); if (Config.disable_load_job) { throw new TransactionCommitFailedException( "disable_load_job is set to true, all load jobs are not allowed"); } - List mowTableList = getMowTableList(tableList, tabletCommitInfos); if (!mowTableList.isEmpty()) { // may be this txn has been calculated by previously task but commit rpc is timeout, // and be will send another commit request to fe, so need to check txn status first @@ -493,7 +513,8 @@ private void commitTransaction(long dbId, List
tableList, long transactio transactionState.getTransactionStatus().toString()); } } - calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos, null); + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, + Config.calculate_delete_bitmap_task_timeout_seconds); } CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder(); @@ -535,6 +556,10 @@ private void commitTransaction(long dbId, List
tableList, long transactio private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, long transactionId, boolean is2PC, TxnCommitAttachment txnCommitAttachment) throws UserException { + if (DebugPointUtil.isEnable("FE.mow.commit.exception")) { + LOG.info("debug point FE.mow.commit.exception, throw e"); + throw new UserException("debug point FE.mow.commit.exception"); + } boolean txnOperated = false; TransactionState txnState = null; TxnStateChangeCallback cb = null; @@ -653,43 +678,6 @@ private List getMowTableList(List
tableList, List tableList, long transactionId, - List tabletCommitInfos, List subTransactionStates) - throws UserException { - Map>> backendToPartitionTablets = Maps.newHashMap(); - Map partitions = Maps.newHashMap(); - Map> tableToPartitions = Maps.newHashMap(); - Map> tableToTabletList = Maps.newHashMap(); - Map tabletToTabletMeta = Maps.newHashMap(); - getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, partitions, backendToPartitionTablets, - tableToTabletList, tabletToTabletMeta); - if (backendToPartitionTablets.isEmpty()) { - throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId); - } - - Map> partitionToSubTxnIds = getPartitionSubTxnIds(subTransactionStates, tableToTabletList, - tabletToTabletMeta); - Map baseCompactionCnts = Maps.newHashMap(); - Map cumulativeCompactionCnts = Maps.newHashMap(); - Map cumulativePoints = Maps.newHashMap(); - getDeleteBitmapUpdateLock(tableToPartitions, transactionId, tableToTabletList, tabletToTabletMeta, - baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints); - Map partitionVersions = getPartitionVersions(partitions); - - Map> backendToPartitionInfos = getCalcDeleteBitmapInfo( - backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts, - cumulativePoints, partitionToSubTxnIds); - try { - sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, - subTransactionStates == null ? Config.calculate_delete_bitmap_task_timeout_seconds - : Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load); - } catch (UserException e) { - LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + transactionId + ",exception=" + e.getMessage()); - removeDeleteBitmapUpdateLock(tableToPartitions, transactionId); - throw e; - } - } - private Map> getPartitionSubTxnIds(List subTransactionStates, Map> tableToTabletList, Map tabletToTabletMeta) { if (subTransactionStates == null) { @@ -715,11 +703,7 @@ private Map> getPartitionSubTxnIds(List su private void getPartitionInfo(List tableList, List tabletCommitInfos, - Map> tableToParttions, - Map partitions, - Map>> backendToPartitionTablets, - Map> tableToTabletList, - Map tabletToTabletMeta) { + DeleteBitmapUpdateLockContext lockContext) { Map tableMap = Maps.newHashMap(); for (OlapTable olapTable : tableList) { tableMap.put(olapTable.getId(), olapTable); @@ -731,7 +715,7 @@ private void getPartitionInfo(List tableList, List tabletMetaList = tabletInvertedIndex.getTabletMetaList(tabletIds); for (int i = 0; i < tabletMetaList.size(); i++) { long tabletId = tabletIds.get(i); - if (tabletToTabletMeta.containsKey(tabletId)) { + if (lockContext.getTabletToTabletMeta().containsKey(tabletId)) { continue; } TabletMeta tabletMeta = tabletMetaList.get(i); @@ -740,9 +724,10 @@ private void getPartitionInfo(List tableList, continue; } - tabletToTabletMeta.put(tabletId, tabletMeta); + lockContext.getTabletToTabletMeta().put(tabletId, tabletMeta); - List tableTabletIds = tableToTabletList.computeIfAbsent(tableId, k -> Lists.newArrayList()); + List tableTabletIds = lockContext.getTableToTabletList() + .computeIfAbsent(tableId, k -> Lists.newArrayList()); if (!tableTabletIds.contains(tabletId)) { tableTabletIds.add(tabletId); } @@ -750,20 +735,20 @@ private void getPartitionInfo(List tableList, long partitionId = tabletMeta.getPartitionId(); long backendId = tabletCommitInfos.get(i).getBackendId(); - if (!tableToParttions.containsKey(tableId)) { - tableToParttions.put(tableId, Sets.newHashSet()); + if (!lockContext.getTableToPartitions().containsKey(tableId)) { + lockContext.getTableToPartitions().put(tableId, Sets.newHashSet()); } - tableToParttions.get(tableId).add(partitionId); + lockContext.getTableToPartitions().get(tableId).add(partitionId); - if (!backendToPartitionTablets.containsKey(backendId)) { - backendToPartitionTablets.put(backendId, Maps.newHashMap()); + if (!lockContext.getBackendToPartitionTablets().containsKey(backendId)) { + lockContext.getBackendToPartitionTablets().put(backendId, Maps.newHashMap()); } - Map> partitionToTablets = backendToPartitionTablets.get(backendId); + Map> partitionToTablets = lockContext.getBackendToPartitionTablets().get(backendId); if (!partitionToTablets.containsKey(partitionId)) { partitionToTablets.put(partitionId, Sets.newHashSet()); } partitionToTablets.get(partitionId).add(tabletId); - partitions.putIfAbsent(partitionId, tableMap.get(tableId).getPartition(partitionId)); + lockContext.getPartitions().putIfAbsent(partitionId, tableMap.get(tableId).getPartition(partitionId)); } } @@ -778,11 +763,10 @@ private Map getPartitionVersions(Map partitionMap) } private Map> getCalcDeleteBitmapInfo( - Map>> backendToPartitionTablets, Map partitionVersions, - Map baseCompactionCnts, Map cumulativeCompactionCnts, - Map cumulativePoints, Map> partitionToSubTxnIds) { + DeleteBitmapUpdateLockContext lockContext, Map> partitionToSubTxnIds) { Map> backendToPartitionInfos = Maps.newHashMap(); - for (Map.Entry>> entry : backendToPartitionTablets.entrySet()) { + Map partitionVersions = getPartitionVersions(lockContext.getPartitions()); + for (Map.Entry>> entry : lockContext.getBackendToPartitionTablets().entrySet()) { List partitionInfos = Lists.newArrayList(); for (Map.Entry> partitionToTablets : entry.getValue().entrySet()) { Long partitionId = partitionToTablets.getKey(); @@ -790,15 +774,16 @@ private Map> getCalcDeleteBitmapInfo( TCalcDeleteBitmapPartitionInfo partitionInfo = new TCalcDeleteBitmapPartitionInfo(partitionId, partitionVersions.get(partitionId), Lists.newArrayList(tabletList)); - if (!baseCompactionCnts.isEmpty() && !cumulativeCompactionCnts.isEmpty() - && !cumulativePoints.isEmpty()) { + if (!lockContext.getBaseCompactionCnts().isEmpty() + && !lockContext.getCumulativeCompactionCnts().isEmpty() + && !lockContext.getCumulativePoints().isEmpty()) { List reqBaseCompactionCnts = Lists.newArrayList(); List reqCumulativeCompactionCnts = Lists.newArrayList(); List reqCumulativePoints = Lists.newArrayList(); for (long tabletId : tabletList) { - reqBaseCompactionCnts.add(baseCompactionCnts.get(tabletId)); - reqCumulativeCompactionCnts.add(cumulativeCompactionCnts.get(tabletId)); - reqCumulativePoints.add(cumulativePoints.get(tabletId)); + reqBaseCompactionCnts.add(lockContext.getBaseCompactionCnts().get(tabletId)); + reqCumulativeCompactionCnts.add(lockContext.getCumulativeCompactionCnts().get(tabletId)); + reqCumulativePoints.add(lockContext.getCumulativePoints().get(tabletId)); } partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts); partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts); @@ -818,10 +803,9 @@ private Map> getCalcDeleteBitmapInfo( return backendToPartitionInfos; } - private void getDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId, - Map> tableToTabletList, Map tabletToTabletMeta, - Map baseCompactionCnts, Map cumulativeCompactionCnts, - Map cumulativePoints) throws UserException { + private void getDeleteBitmapUpdateLock(long transactionId, List mowTableList, + List tabletCommitInfos, DeleteBitmapUpdateLockContext lockContext) + throws UserException { if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep")) { DebugPoint debugPoint = DebugPointUtil.getDebugPoint( "CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep"); @@ -854,17 +838,15 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo } StopWatch stopWatch = new StopWatch(); stopWatch.start(); + getPartitionInfo(mowTableList, tabletCommitInfos, lockContext); int totalRetryTime = 0; - for (Map.Entry> entry : tableToParttions.entrySet()) { + for (Map.Entry> entry : lockContext.getTableToPartitions().entrySet()) { GetDeleteBitmapUpdateLockRequest.Builder builder = GetDeleteBitmapUpdateLockRequest.newBuilder(); - builder.setTableId(entry.getKey()) - .setLockId(transactionId) - .setInitiator(-1) - .setExpiration(Config.delete_bitmap_lock_expiration_seconds) - .setRequireCompactionStats(true); - List tabletList = tableToTabletList.get(entry.getKey()); + builder.setTableId(entry.getKey()).setLockId(transactionId).setInitiator(-1) + .setExpiration(Config.delete_bitmap_lock_expiration_seconds).setRequireCompactionStats(true); + List tabletList = lockContext.getTableToTabletList().get(entry.getKey()); for (Long tabletId : tabletList) { - TabletMeta tabletMeta = tabletToTabletMeta.get(tabletId); + TabletMeta tabletMeta = lockContext.getTabletToTabletMeta().get(tabletId); TabletIndexPB.Builder tabletIndexBuilder = TabletIndexPB.newBuilder(); tabletIndexBuilder.setDbId(tabletMeta.getDbId()); tabletIndexBuilder.setTableId(tabletMeta.getTableId()); @@ -881,16 +863,16 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo try { response = MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request); if (LOG.isDebugEnabled()) { - LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}", - transactionId, request, response); + LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}", transactionId, + request, response); } if (response.getStatus().getCode() != MetaServiceCode.LOCK_CONFLICT && response.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { break; } } catch (Exception e) { - LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}", - transactionId, retryTime, e); + LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}", transactionId, + retryTime, e); } // sleep random millis [20, 300] ms, avoid txn conflict int randomMillis = 20 + (int) (Math.random() * (300 - 20)); @@ -906,8 +888,8 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo Preconditions.checkNotNull(response); Preconditions.checkNotNull(response.getStatus()); if (response.getStatus().getCode() != MetaServiceCode.OK) { - LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}", - transactionId, retryTime, response); + LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}", transactionId, + retryTime, response); if (response.getStatus().getCode() == MetaServiceCode.LOCK_CONFLICT || response.getStatus().getCode() == MetaServiceCode.KV_TXN_CONFLICT) { // DELETE_BITMAP_LOCK_ERR will be retried on be @@ -928,30 +910,28 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()) { throw new UserException("The size of returned compaction cnts can't match the size of tabletList, " + "tabletList.size()=" + tabletList.size() + ", respBaseCompactionCnts.size()=" + size1 - + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" - + size3); + + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3); } for (int i = 0; i < tabletList.size(); i++) { long tabletId = tabletList.get(i); - baseCompactionCnts.put(tabletId, respBaseCompactionCnts.get(i)); - cumulativeCompactionCnts.put(tabletId, respCumulativeCompactionCnts.get(i)); - cumulativePoints.put(tabletId, respCumulativePoints.get(i)); + lockContext.getBaseCompactionCnts().put(tabletId, respBaseCompactionCnts.get(i)); + lockContext.getCumulativeCompactionCnts().put(tabletId, respCumulativeCompactionCnts.get(i)); + lockContext.getCumulativePoints().put(tabletId, respCumulativePoints.get(i)); } totalRetryTime += retryTime; } stopWatch.stop(); if (totalRetryTime > 0 || stopWatch.getTime() > 20) { - LOG.info( - "get delete bitmap lock successfully. txns: {}. totalRetryTime: {}. " - + "partitionSize: {}. time cost: {} ms.", - transactionId, totalRetryTime, tableToParttions.size(), stopWatch.getTime()); + LOG.info("get delete bitmap lock successfully. txns: {}. totalRetryTime: {}. " + + "partitionSize: {}. time cost: {} ms.", transactionId, totalRetryTime, + lockContext.getTableToPartitions().size(), stopWatch.getTime()); } } - private void removeDeleteBitmapUpdateLock(Map> tableToParttions, long transactionId) { - for (Map.Entry> entry : tableToParttions.entrySet()) { + private void removeDeleteBitmapUpdateLock(List tableList, long transactionId) { + for (OlapTable table : tableList) { RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder(); - builder.setTableId(entry.getKey()) + builder.setTableId(table.getId()) .setLockId(transactionId) .setInitiator(-1); final RemoveDeleteBitmapUpdateLockRequest request = builder.build(); @@ -978,6 +958,10 @@ private void removeDeleteBitmapUpdateLock(Map> tableToParttions, private void sendCalcDeleteBitmaptask(long dbId, long transactionId, Map> backendToPartitionInfos, long calculateDeleteBitmapTaskTimeoutSeconds) throws UserException { + if (backendToPartitionInfos == null) { + throw new UserException("failed to send calculate delete bitmap task to be,transactionId=" + transactionId + + ",but backendToPartitionInfos is null"); + } if (backendToPartitionInfos.isEmpty()) { return; } @@ -1100,8 +1084,34 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, .collect(Collectors.toList()); List
tableList = ((Database) db).getTablesOnIdOrderOrThrowException(tableIdList); beforeCommitTransaction(tableList, transactionId, timeoutMillis); + List tabletCommitInfos = subTransactionStates.stream().map( + SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream) + .map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())).collect(Collectors.toList()); + List mowTableList = getMowTableList(tableList, tabletCommitInfos); try { - commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates); + Map> backendToPartitionInfos = null; + if (!mowTableList.isEmpty()) { + DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext(); + getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext); + if (lockContext.getBackendToPartitionTablets().isEmpty()) { + throw new UserException( + "The partition info is empty, table may be dropped, txnid=" + transactionId); + } + Map> partitionToSubTxnIds = getPartitionSubTxnIds(subTransactionStates, + lockContext.getTableToTabletList(), + lockContext.getTabletToTabletMeta()); + backendToPartitionInfos = getCalcDeleteBitmapInfo( + lockContext, partitionToSubTxnIds); + } + commitTransactionWithSubTxns(db.getId(), tableList, transactionId, subTransactionStates, mowTableList, + backendToPartitionInfos); + } catch (Exception e) { + if (!mowTableList.isEmpty()) { + LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId, + e.getMessage()); + removeDeleteBitmapUpdateLock(mowTableList, transactionId); + } + throw e; } finally { afterCommitTransaction(tableList); } @@ -1109,13 +1119,11 @@ public boolean commitAndPublishTransaction(DatabaseIf db, long transactionId, } private void commitTransactionWithSubTxns(long dbId, List
tableList, long transactionId, - List subTransactionStates) throws UserException { - List tabletCommitInfos = subTransactionStates.stream().map( - SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream) - .map(c -> new TabletCommitInfo(c.getTabletId(), c.getBackendId())).collect(Collectors.toList()); - List mowTableList = getMowTableList(tableList, tabletCommitInfos); + List subTransactionStates, List mowTableList, + Map> backendToPartitionInfos) throws UserException { if (!mowTableList.isEmpty()) { - calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos, subTransactionStates); + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos, + Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load); } cleanSubTransactions(transactionId); @@ -1196,7 +1204,8 @@ public boolean commitAndPublishTransaction(DatabaseIf db, List
tableList, @Override public void commitTransaction2PC(Database db, List
tableList, long transactionId, long timeoutMillis) throws UserException { - commitTransaction(db.getId(), tableList, transactionId, null, null, true); + List mowTableList = getMowTableList(tableList, null); + commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java new file mode 100644 index 00000000000000..fcc84b9ca18227 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.transaction; + +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TabletMeta; + +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DeleteBitmapUpdateLockContext { + private Map baseCompactionCnts; + private Map cumulativeCompactionCnts; + private Map cumulativePoints; + private Map> tableToPartitions; + private Map partitions; + private Map>> backendToPartitionTablets; + private Map> tableToTabletList; + private Map tabletToTabletMeta; + + public DeleteBitmapUpdateLockContext() { + baseCompactionCnts = Maps.newHashMap(); + cumulativeCompactionCnts = Maps.newHashMap(); + cumulativePoints = Maps.newHashMap(); + tableToPartitions = Maps.newHashMap(); + partitions = Maps.newHashMap(); + backendToPartitionTablets = Maps.newHashMap(); + tableToTabletList = Maps.newHashMap(); + tabletToTabletMeta = Maps.newHashMap(); + } + + public Map> getTableToTabletList() { + return tableToTabletList; + } + + public Map getBaseCompactionCnts() { + return baseCompactionCnts; + } + + public Map getCumulativeCompactionCnts() { + return cumulativeCompactionCnts; + } + + public Map getCumulativePoints() { + return cumulativePoints; + } + + public Map>> getBackendToPartitionTablets() { + return backendToPartitionTablets; + } + + public Map getPartitions() { + return partitions; + } + + public Map> getTableToPartitions() { + return tableToPartitions; + } + + public Map getTabletToTabletMeta() { + return tabletToTabletMeta; + } + +} diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out new file mode 100644 index 00000000000000..b8b3ea3eccac14 --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- + +-- !sql -- +5 e 90 +6 f 100 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy new file mode 100644 index 00000000000000..fa71c3644f2027 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") { + GetDebugPoint().clearDebugPointsForAllFEs() + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def customFeConfig = [ + calculate_delete_bitmap_task_timeout_seconds: 2, + meta_service_rpc_retry_times : 5 + ] + + // store the original value + get_be_param("mow_stream_load_commit_retry_times") + // disable retry to make this problem more clear + set_be_param("mow_stream_load_commit_retry_times", "1") + + + def tableName = "tbl_basic" + setFeConfigTemporary(customFeConfig) { + try { + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + // this streamLoad will fail on fe commit phase + GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null) + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("FE.mow.commit.exception")) + } + } + qt_sql """ select * from ${tableName} order by id""" + + // this streamLoad will success because of removing exception injection + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception') + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql """ select * from ${tableName} order by id""" + } finally { + reset_be_param("mow_stream_load_commit_retry_times") + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception') + sql "DROP TABLE IF EXISTS ${tableName};" + GetDebugPoint().clearDebugPointsForAllFEs() + } + + } +}