Skip to content

Commit

Permalink
[improvement](statistics)External table getRowCount return -1 when ro…
Browse files Browse the repository at this point in the history
…w count is not available or row count is 0. (#43009)

External table getRowCount return -1 when row count is not available or
row count is 0. So the behavior of external table could match with
internal olap table.
  • Loading branch information
Jibing-Li committed Nov 1, 2024
1 parent 2f33dd5 commit a3bd58b
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ public enum OlapTableState {
WAITING_STABLE
}

public static long ROW_COUNT_BEFORE_REPORT = -1;

private volatile OlapTableState state;

// index id -> index meta
Expand Down Expand Up @@ -1298,12 +1296,12 @@ public long getRowCountForIndex(long indexId, boolean strict) {
if (index == null) {
LOG.warn("Index {} not exist in partition {}, table {}, {}",
indexId, entry.getValue().getName(), id, name);
return ROW_COUNT_BEFORE_REPORT;
return UNKNOWN_ROW_COUNT;
}
if (strict && !index.getRowCountReported()) {
return ROW_COUNT_BEFORE_REPORT;
return UNKNOWN_ROW_COUNT;
}
rowCount += index.getRowCount() == -1 ? 0 : index.getRowCount();
rowCount += index.getRowCount() == UNKNOWN_ROW_COUNT ? 0 : index.getRowCount();
}
return rowCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,6 @@ public List<Long> getChunkSizes() {

@Override
public long fetchRowCount() {
return 0;
return UNKNOWN_ROW_COUNT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
public interface TableIf {
Logger LOG = LogManager.getLogger(TableIf.class);

long UNKNOWN_ROW_COUNT = -1;

void readLock();

boolean tryReadLock(long timeout, TimeUnit unit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public long getRowCount() {
makeSureInitialized();
} catch (Exception e) {
LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e);
return 0;
return TableIf.UNKNOWN_ROW_COUNT;
}
// All external table should get external row count from cache.
return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
Expand All @@ -302,7 +302,7 @@ public long getRowCount() {
* This is called by ExternalRowCountCache to load row count cache.
*/
public long fetchRowCount() {
return 0;
return UNKNOWN_ROW_COUNT;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.HudiUtils;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
Expand Down Expand Up @@ -308,9 +309,9 @@ private long getRowCountFromExternalSource() {
break;
default:
LOG.warn("getRowCount for dlaType {} is not supported.", dlaType);
rowCount = -1;
rowCount = TableIf.UNKNOWN_ROW_COUNT;
}
return rowCount;
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
}

@Override
Expand Down Expand Up @@ -477,7 +478,7 @@ public long fetchRowCount() {
// Get row count from hive metastore property.
long rowCount = getRowCountFromExternalSource();
// Only hive table supports estimate row count by listing file.
if (rowCount == -1 && dlaType.equals(DLAType.HIVE)) {
if (rowCount == UNKNOWN_ROW_COUNT && dlaType.equals(DLAType.HIVE)) {
LOG.debug("Will estimate row count from file list.");
rowCount = StatisticsUtil.getRowCountFromFileList(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ public long fetchRowCount() {
for (Split split : splits) {
rowCount += split.rowCount();
}
return rowCount;
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", dbName, name, e);
}
return -1;
return UNKNOWN_ROW_COUNT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected Optional<Long> doLoad(RowCountKey rowCountKey) {
}

/**
* Get cached row count for the given table. Return 0 if cached not loaded or table not exists.
* Get cached row count for the given table. Return -1 if cached not loaded or table not exists.
* Cached will be loaded async.
* @param catalogId
* @param dbId
Expand All @@ -106,12 +106,12 @@ public long getCachedRowCount(long catalogId, long dbId, long tableId) {
try {
CompletableFuture<Optional<Long>> f = rowCountCache.get(key);
if (f.isDone()) {
return f.get().orElse(0L);
return f.get().orElse(TableIf.UNKNOWN_ROW_COUNT);
}
} catch (Exception e) {
LOG.warn("Unexpected exception while returning row count", e);
}
return 0;
return TableIf.UNKNOWN_ROW_COUNT;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.analysis.Subquery;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand Down Expand Up @@ -616,15 +617,18 @@ public static long getIcebergRowCount(ExternalCatalog catalog, String dbName, St
.getIcebergTable(catalog, dbName, tbName);
Snapshot snapshot = icebergTable.currentSnapshot();
if (snapshot == null) {
LOG.info("Iceberg table {}.{}.{} is empty, return -1.", catalog.getName(), dbName, tbName);
// empty table
return 0;
return TableIf.UNKNOWN_ROW_COUNT;
}
Map<String, String> summary = snapshot.summary();
return Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
long rows = Long.parseLong(summary.get(TOTAL_RECORDS)) - Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
LOG.info("Iceberg table {}.{}.{} row count in summary is {}", catalog.getName(), dbName, tbName, rows);
return rows;
} catch (Exception e) {
LOG.warn("Fail to collect row count for db {} table {}", dbName, tbName, e);
}
return -1;
return TableIf.UNKNOWN_ROW_COUNT;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) {
OlapTable ot = (OlapTable) table;
if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == OlapTable.ROW_COUNT_BEFORE_REPORT) {
if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) {
LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName());
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,18 +562,19 @@ public static int getTableHealth(long totalRows, long updatedRows) {
public static long getHiveRowCount(HMSExternalTable table) {
Map<String, String> parameters = table.getRemoteTable().getParameters();
if (parameters == null) {
return -1;
return TableIf.UNKNOWN_ROW_COUNT;
}
// Table parameters contains row count, simply get and return it.
if (parameters.containsKey(NUM_ROWS)) {
long rows = Long.parseLong(parameters.get(NUM_ROWS));
// Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0.
if (rows != 0) {
if (rows > 0) {
LOG.info("Get row count {} for hive table {} in table parameters.", rows, table.getName());
return rows;
}
}
if (!parameters.containsKey(TOTAL_SIZE)) {
return -1;
return TableIf.UNKNOWN_ROW_COUNT;
}
// Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize
long totalSize = Long.parseLong(parameters.get(TOTAL_SIZE));
Expand All @@ -582,9 +583,13 @@ public static long getHiveRowCount(HMSExternalTable table) {
estimatedRowSize += column.getDataType().getSlotSize();
}
if (estimatedRowSize == 0) {
return -1;
LOG.warn("Hive table {} estimated row size is invalid {}", table.getName(), estimatedRowSize);
return TableIf.UNKNOWN_ROW_COUNT;
}
return totalSize / estimatedRowSize;
long rows = totalSize / estimatedRowSize;
LOG.debug("Get row count {} for hive table {} by total size {} and row size {}",
rows, table.getName(), totalSize, estimatedRowSize);
return rows;
}

/**
Expand All @@ -608,7 +613,7 @@ public static long getTotalSizeFromHMS(HMSExternalTable table) {
*/
public static long getRowCountFromFileList(HMSExternalTable table) {
if (table.isView()) {
return 0;
return TableIf.UNKNOWN_ROW_COUNT;
}
HiveMetaStoreCache.HivePartitionValues partitionValues = getPartitionValuesForTable(table);
int totalPartitionSize = partitionValues == null ? 1 : partitionValues.getIdToPartitionItem().size();
Expand All @@ -635,12 +640,13 @@ public static long getRowCountFromFileList(HMSExternalTable table) {
estimatedRowSize += column.getDataType().getSlotSize();
}
if (estimatedRowSize == 0) {
return 0;
return TableIf.UNKNOWN_ROW_COUNT;
}
if (samplePartitionSize < totalPartitionSize) {
totalSize = totalSize * totalPartitionSize / samplePartitionSize;
}
return totalSize / estimatedRowSize;
long rows = totalSize / estimatedRowSize;
return rows > 0 ? rows : TableIf.UNKNOWN_ROW_COUNT;
}

public static HiveMetaStoreCache.HivePartitionValues getPartitionValuesForTable(HMSExternalTable table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,23 @@

package org.apache.doris.datasource;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.datasource.ExternalRowCountCache.RowCountCacheLoader;
import org.apache.doris.datasource.ExternalRowCountCache.RowCountKey;
import org.apache.doris.statistics.BasicAsyncCacheLoader;

import mockit.Mock;
import mockit.MockUp;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class ExternalRowCountCacheTest {
Expand Down Expand Up @@ -66,22 +73,22 @@ public void setUp() {
public void test() throws Exception {
// table 1
long rowCount = cache.getCachedRowCount(1, 1, 1);
Assertions.assertEquals(0, rowCount);
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount);
Thread.sleep(1000);
rowCount = cache.getCachedRowCount(1, 1, 1);
Assertions.assertEquals(111, rowCount);

// table 2
rowCount = cache.getCachedRowCount(1, 1, 2);
Assertions.assertEquals(0, rowCount);
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount);
Thread.sleep(1000);
rowCount = cache.getCachedRowCount(1, 1, 2);
Assertions.assertEquals(222, rowCount);

// table 3
rowCount = cache.getCachedRowCount(1, 1, 3);
// first get, it should be 0 because the loader is async
Assertions.assertEquals(0, rowCount);
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, rowCount);
// After sleep 2 sec and then get, it should be 1
Thread.sleep(2000);
rowCount = cache.getCachedRowCount(1, 1, 3);
Expand All @@ -97,4 +104,74 @@ public void test() throws Exception {
// refresh done, value should be 2
Assertions.assertEquals(335, rowCount);
}

@Test
public void testLoadWithException() throws Exception {
ThreadPoolExecutor executor = ThreadPoolManager.newDaemonFixedThreadPool(
1, Integer.MAX_VALUE, "TEST", true);
AtomicInteger counter = new AtomicInteger(0);

new MockUp<RowCountCacheLoader>() {
@Mock
protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) {
counter.incrementAndGet();
return null;
}
};
ExternalRowCountCache cache = new ExternalRowCountCache(executor, 2, null);
long cachedRowCount = cache.getCachedRowCount(1, 1, 1);
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
for (int i = 0; i < 60; i++) {
if (counter.get() == 1) {
break;
}
Thread.sleep(1000);
}
Assertions.assertEquals(1, counter.get());

new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
@Mock
protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) {
counter.incrementAndGet();
return Optional.of(100L);
}
};
cache.getCachedRowCount(1, 1, 1);
for (int i = 0; i < 60; i++) {
cachedRowCount = cache.getCachedRowCount(1, 1, 1);
if (cachedRowCount != TableIf.UNKNOWN_ROW_COUNT) {
Assertions.assertEquals(100, cachedRowCount);
break;
}
Thread.sleep(1000);
}
cachedRowCount = cache.getCachedRowCount(1, 1, 1);
Assertions.assertEquals(100, cachedRowCount);
Assertions.assertEquals(2, counter.get());

new MockUp<ExternalRowCountCache.RowCountCacheLoader>() {
@Mock
protected Optional<Long> doLoad(ExternalRowCountCache.RowCountKey rowCountKey) {
counter.incrementAndGet();
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Optional.of(100L);
}
};
cachedRowCount = cache.getCachedRowCount(2, 2, 2);
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
Thread.sleep(1000);
cachedRowCount = cache.getCachedRowCount(2, 2, 2);
Assertions.assertEquals(TableIf.UNKNOWN_ROW_COUNT, cachedRowCount);
for (int i = 0; i < 60; i++) {
if (counter.get() == 3) {
break;
}
Thread.sleep(1000);
}
Assertions.assertEquals(3, counter.get());
}
}

0 comments on commit a3bd58b

Please sign in to comment.