From 27f72a49ade7ba09ed2abea13049e3b2c4bdf412 Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:26:42 +0800 Subject: [PATCH] [improvement](statistics)External table getRowCount return -1 when row count is not available or row count is 0. (#43009) External table getRowCount return -1 when row count is not available or row count is 0. So the behavior of external table could match with internal olap table. --- .../org/apache/doris/catalog/OlapTable.java | 8 +- .../java/org/apache/doris/catalog/Table.java | 2 +- .../org/apache/doris/catalog/TableIf.java | 2 + .../doris/catalog/external/ExternalTable.java | 4 +- .../catalog/external/HMSExternalTable.java | 7 +- .../catalog/external/PaimonExternalTable.java | 4 +- .../datasource/ExternalRowCountCache.java | 6 +- .../external/iceberg/util/IcebergUtils.java | 6 +- .../statistics/StatisticsAutoCollector.java | 2 +- .../doris/statistics/util/StatisticsUtil.java | 22 +++-- .../datasource/ExternalRowCountCacheTest.java | 83 ++++++++++++++++++- 11 files changed, 116 insertions(+), 30 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 3d14b92de07371..9b95b1b20ec6ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -119,8 +119,6 @@ public enum OlapTableState { WAITING_STABLE } - public static long ROW_COUNT_BEFORE_REPORT = -1; - private volatile OlapTableState state; // index id -> index meta @@ -1298,12 +1296,12 @@ public long getRowCountForIndex(long indexId, boolean strict) { if (index == null) { LOG.warn("Index {} not exist in partition {}, table {}, {}", indexId, entry.getValue().getName(), id, name); - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } if (strict && !index.getRowCountReported()) { - return ROW_COUNT_BEFORE_REPORT; + return UNKNOWN_ROW_COUNT; } - rowCount += index.getRowCount() == -1 ? 0 : index.getRowCount(); + rowCount += index.getRowCount() == UNKNOWN_ROW_COUNT ? 0 : index.getRowCount(); } return rowCount; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 7cbef2a5d6cc73..17df068ff21cda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -614,6 +614,6 @@ public List getChunkSizes() { @Override public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index c1bcf5b2179f15..4586ed62900784 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -45,6 +45,8 @@ public interface TableIf { Logger LOG = LogManager.getLogger(TableIf.class); + long UNKNOWN_ROW_COUNT = -1; + void readLock(); boolean tryReadLock(long timeout, TimeUnit unit); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index 90188e18fedcbe..0a8ffce08ce5b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -290,7 +290,7 @@ public long getRowCount() { makeSureInitialized(); } catch (Exception e) { LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e); - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } // All external table should get external row count from cache. return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id); @@ -302,7 +302,7 @@ public long getRowCount() { * This is called by ExternalRowCountCache to load row count cache. */ public long fetchRowCount() { - return 0; + return UNKNOWN_ROW_COUNT; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 4632433ee496ea..7910ed5af45ce1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.HudiUtils; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreCache; @@ -308,9 +309,9 @@ private long getRowCountFromExternalSource() { break; default: LOG.warn("getRowCount for dlaType {} is not supported.", dlaType); - rowCount = -1; + rowCount = TableIf.UNKNOWN_ROW_COUNT; } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } @Override @@ -477,7 +478,7 @@ public long fetchRowCount() { // Get row count from hive metastore property. long rowCount = getRowCountFromExternalSource(); // Only hive table supports estimate row count by listing file. - if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) { + if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) { LOG.debug("Will estimate row count from file list."); rowCount = StatisticsUtil.getRowCountFromFileList(this); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java index a28e7178dfbdeb..b2bb165c47185d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java @@ -175,10 +175,10 @@ public long fetchRowCount() { for (Split split : splits) { rowCount += split.rowCount(); } - return rowCount; + return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } catch (Exception e) { LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e); } - return -1; + return UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java index 632cde1d5a721e..68fed7c2f36692 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalRowCountCache.java @@ -94,7 +94,7 @@ protected Optional doLoad(RowCountKey rowCountKey) { } /** - * Get cached row count for the given table. Return 0 if cached not loaded or table not exists. + * Get cached row count for the given table. Return -1 if cached not loaded or table not exists. * Cached will be loaded async. * @param catalogId * @param dbId @@ -106,12 +106,12 @@ public long getCachedRowCount(long catalogId, long dbId, long tableId) { try { CompletableFuture> f = rowCountCache.get(key); if (f.isDone()) { - return f.get().orElse(0L); + return f.get().orElse(TableIf.UNKNOWN_ROW_COUNT); } } catch (Exception e) { LOG.warn("Unexpected exception while returning row count", e); } - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java index 3f4779ef893623..90695b94cc4b1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java @@ -36,6 +36,7 @@ import org.apache.doris.analysis.Subquery; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -616,15 +617,16 @@ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, St .getIcebergTable(catalog, dbName, tbName); Snapshot snapshot = icebergTable.currentSnapshot(); if (snapshot == null) { + LOG.info("Iceberg table {}.{}.{} is empty, return -1.", catalog.getName(), dbName, tbName); // empty table - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } Map summary = snapshot.summary(); return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES)); } catch (Exception e) { LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e); } - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 137068d4315757..b4d8f1ad0eca64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -181,7 +181,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf db, ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) { OlapTable ot = (OlapTable) table; - if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == OlapTable.ROW_COUNT_BEFORE_REPORT) { + if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) { LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName()); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index fe32755be455a1..b959b9bd506feb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -562,18 +562,19 @@ public static int getTableHealth(long totalRows, long updatedRows) { public static long getHiveRowCount(HMSExternalTable table) { Map parameters = table.getRemoteTable().getParameters(); if (parameters == null) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters contains row count, simply get and return it. if (parameters.containsKey(NUM_ROWS)) { long rows = Long.parseLong(parameters.get(NUM_ROWS)); // Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0. - if (rows != 0) { + if (rows > 0) { + LOG.info("Get row count {} for hive table {} in table parameters.", rows, table.getName()); return rows; } } if (!parameters.containsKey(TOTAL_SIZE)) { - return -1; + return TableIf.UNKNOWN_ROW_COUNT; } // Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE)); @@ -582,9 +583,13 @@ public static long getHiveRowCount(HMSExternalTable table) { estimatedRowSize += column.getDataType().getSlotSize(); } if (estimatedRowSize == 0) { - return -1; + LOG.warn("Hive table {} estimated row size is invalid {}", table.getName(), estimatedRowSize); + return TableIf.UNKNOWN_ROW_COUNT; } - return totalSize / estimatedRowSize; + long rows = totalSize / estimatedRowSize; + LOG.debug("Get row count {} for hive table {} by total size {} and row size {}", + rows, table.getName(), totalSize, estimatedRowSize); + return rows; } /** @@ -608,7 +613,7 @@ public static long getTotalSizeFromHMS(HMSExternalTable table) { */ public static long getRowCountFromFileList(HMSExternalTable table) { if (table.isView()) { - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } HiveMetaStoreCache.HivePartitionValues partitionValues = getPartitionValuesForTable(table); int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size(); @@ -635,12 +640,13 @@ public static long getRowCountFromFileList(HMSExternalTable table) { estimatedRowSize += column.getDataType().getSlotSize(); } if (estimatedRowSize == 0) { - return 0; + return TableIf.UNKNOWN_ROW_COUNT; } if (samplePartitionSize < totalPartitionSize) { totalSize = totalSize * totalPartitionSize / samplePartitionSize; } - return totalSize / estimatedRowSize; + long rows = totalSize / estimatedRowSize; + return rows > 0 ? rows : TableIf.UNKNOWN_ROW_COUNT; } public static HiveMetaStoreCache.HivePartitionValues getPartitionValuesForTable(HMSExternalTable table) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java index e8622f6b59a239..c9effa8cf81efe 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalRowCountCacheTest.java @@ -17,9 +17,14 @@ package org.apache.doris.datasource; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.datasource.ExternalRowCountCache.RowCountCacheLoader; import org.apache.doris.datasource.ExternalRowCountCache.RowCountKey; import org.apache.doris.statistics.BasicAsyncCacheLoader; +import mockit.Mock; +import mockit.MockUp; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,6 +32,8 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class ExternalRowCountCacheTest { @@ -66,14 +73,14 @@ public void setUp() { public void test() throws Exception { // table 1 long rowCount = cache.getCachedRowCount(1, 1, 1); - Assertions.assertEquals(0, rowCount); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount); Thread.sleep(1000); rowCount = cache.getCachedRowCount(1, 1, 1); Assertions.assertEquals(111, rowCount); // table 2 rowCount = cache.getCachedRowCount(1, 1, 2); - Assertions.assertEquals(0, rowCount); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount); Thread.sleep(1000); rowCount = cache.getCachedRowCount(1, 1, 2); Assertions.assertEquals(222, rowCount); @@ -81,7 +88,7 @@ public void test() throws Exception { // table 3 rowCount = cache.getCachedRowCount(1, 1, 3); // first get, it should be 0 because the loader is async - Assertions.assertEquals(0, rowCount); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount); // After sleep 2 sec and then get, it should be 1 Thread.sleep(2000); rowCount = cache.getCachedRowCount(1, 1, 3); @@ -97,4 +104,74 @@ public void test() throws Exception { // refresh done, value should be 2 Assertions.assertEquals(335, rowCount); } + + @Test + public void testLoadWithException() throws Exception { + ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool( + 1, Integer.MAX_VALUE, "TEST", true); + AtomicInteger counter = new AtomicInteger(0); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return null; + } + }; + ExternalRowCountCache cache = new ExternalRowCountCache(executor, 2, null); + long cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 1) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(1, counter.get()); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + return Optional.of(100L); + } + }; + cache.getCachedRowCount(1, 1, 1); + for (int i = 0; i < 60; i++) { + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) { + Assertions.assertEquals(100, cachedRowCount); + break; + } + Thread.sleep(1000); + } + cachedRowCount = cache.getCachedRowCount(1, 1, 1); + Assertions.assertEquals(100, cachedRowCount); + Assertions.assertEquals(2, counter.get()); + + new MockUp() { + @Mock + protected Optional doLoad(ExternalRowCountCache.RowCountKey rowCountKey) { + counter.incrementAndGet(); + try { + Thread.sleep(1000000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return Optional.of(100L); + } + }; + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + Thread.sleep(1000); + cachedRowCount = cache.getCachedRowCount(2, 2, 2); + Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount); + for (int i = 0; i < 60; i++) { + if (counter.get() == 3) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(3, counter.get()); + } }