diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index e079e6b2ade922..5b1ca430409b91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -45,10 +46,6 @@ public class AnalysisJob { protected List buf; - protected int totalTaskCount; - - protected int queryFinishedTaskCount; - protected StmtExecutor stmtExecutor; protected boolean killed; @@ -63,10 +60,9 @@ public AnalysisJob(AnalysisInfo jobInfo, Collection for (BaseAnalysisTask task : queryingTask) { task.job = this; } - this.queryingTask = new HashSet<>(queryingTask); - this.queryFinished = new HashSet<>(); + this.queryingTask = Collections.synchronizedSet(new HashSet<>(queryingTask)); + this.queryFinished = Collections.synchronizedSet(new HashSet<>()); this.buf = new ArrayList<>(); - totalTaskCount = queryingTask.size(); start = System.currentTimeMillis(); this.jobInfo = jobInfo; this.analysisManager = Env.getCurrentEnv().getAnalysisManager(); @@ -86,12 +82,14 @@ public synchronized void rowCountDone(BaseAnalysisTask task) { } protected void markOneTaskDone() { - queryFinishedTaskCount += 1; - if (queryFinishedTaskCount == totalTaskCount) { - writeBuf(); - updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " - + (System.currentTimeMillis() - start) / 1000); - deregisterJob(); + if (queryingTask.isEmpty()) { + try { + writeBuf(); + updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " + + (System.currentTimeMillis() - start) / 1000); + } finally { + deregisterJob(); + } } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) { writeBuf(); } @@ -175,9 +173,12 @@ protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exce } public void taskFailed(BaseAnalysisTask task, String reason) { - updateTaskState(AnalysisState.FAILED, reason); - cancel(); - deregisterJob(); + try { + updateTaskState(AnalysisState.FAILED, reason); + cancel(); + } finally { + deregisterJob(); + } } public void cancel() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 91f54208f6acd0..b94b28e27702f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -1073,4 +1073,8 @@ public void constructJob(AnalysisInfo jobInfo, Collection analysisTasks = new HashMap<>(); AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index d4dedd17123807..bca05d8299c020 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -44,7 +44,9 @@ public void initTest(@Mocked AnalysisInfo jobInfo, @Mocked OlapAnalysisTask task } @Test - public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, @Mocked OlapAnalysisTask olapAnalysisTask) { + public void testAppendBufTest1(@Mocked AnalysisInfo analysisInfo, + @Mocked OlapAnalysisTask olapAnalysisTask, + @Mocked OlapAnalysisTask olapAnalysisTask2) { AtomicInteger writeBufInvokeTimes = new AtomicInteger(); new MockUp() { @Mock @@ -63,9 +65,9 @@ public void deregisterJob() { AnalysisJob job = new AnalysisJob(analysisInfo, Arrays.asList(olapAnalysisTask)); job.queryingTask = new HashSet<>(); job.queryingTask.add(olapAnalysisTask); + job.queryingTask.add(olapAnalysisTask2); job.queryFinished = new HashSet<>(); job.buf = new ArrayList<>(); - job.totalTaskCount = 20; // not all task finished nor cached limit exceed, shouldn't write job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); @@ -97,7 +99,6 @@ public void deregisterJob() { job.queryingTask.add(olapAnalysisTask); job.queryFinished = new HashSet<>(); job.buf = new ArrayList<>(); - job.totalTaskCount = 1; job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); // all task finished, should write and deregister this job @@ -132,7 +133,6 @@ public void deregisterJob() { for (int i = 0; i < StatisticsUtil.getInsertMergeCount(); i++) { job.buf.add(colStatsData); } - job.totalTaskCount = 100; job.appendBuf(olapAnalysisTask, Arrays.asList(new ColStatsData())); // cache limit exceed, should write them