diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 942fe993c560..7deb5a2a4224 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -536,6 +536,9 @@ public class IoTDBConfig { */ private volatile int minCrossCompactionUnseqFileLevel = 1; + /** The min size of candidate unseq file in one insertion compaction task */ + private volatile long minInsertionCompactionUnseqFileSizeInByte = 100 * 1024 * 1024; + /** The interval of compaction task schedulation in each virtual database. The unit is ms. */ private long compactionScheduleIntervalInMs = 60_000L; @@ -3113,6 +3116,15 @@ public void setMinCrossCompactionUnseqFileLevel(int minCrossCompactionUnseqFileL this.minCrossCompactionUnseqFileLevel = minCrossCompactionUnseqFileLevel; } + public long getMinInsertionCompactionUnseqFileSizeInByte() { + return minInsertionCompactionUnseqFileSizeInByte; + } + + public void setMinInsertionCompactionUnseqFileSizeInByte( + long minInsertionCompactionUnseqFileSizeInByte) { + this.minInsertionCompactionUnseqFileSizeInByte = minInsertionCompactionUnseqFileSizeInByte; + } + public int getSubCompactionTaskNum() { return subCompactionTaskNum; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 5411556ca643..43ebfc71f319 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -878,6 +878,14 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO Integer.toString(conf.getMinCrossCompactionUnseqFileLevel()))) .map(String::trim) .orElse(Integer.toString(conf.getMinCrossCompactionUnseqFileLevel())))); + conf.setMinInsertionCompactionUnseqFileSizeInByte( + Integer.parseInt( + Optional.ofNullable( + properties.getProperty( + "min_insertion_compaction_unseq_file_size_in_byte", + Long.toString(conf.getMinInsertionCompactionUnseqFileSizeInByte()))) + .map(String::trim) + .orElse(Long.toString(conf.getMinInsertionCompactionUnseqFileSizeInByte())))); conf.setCompactionWriteThroughputMbPerSec( Integer.parseInt( @@ -1879,6 +1887,24 @@ private boolean loadCompactionTaskHotModifiedProps(Properties properties) throws configModified |= minCrossCompactionCandidateFileNum != conf.getMinCrossCompactionUnseqFileLevel(); + // update min_insertion_compaction_unseq_file_size_in_byte + long minInsertionCompactionUnseqFileSizeInByte = + conf.getMinInsertionCompactionUnseqFileSizeInByte(); + conf.setMinInsertionCompactionUnseqFileSizeInByte( + Long.parseLong( + Optional.ofNullable( + properties.getProperty( + "min_insertion_compaction_unseq_file_size_in_byte", + ConfigurationFileUtils.getConfigurationDefaultValue( + "min_insertion_compaction_unseq_file_size_in_byte"))) + .map(String::trim) + .orElse( + ConfigurationFileUtils.getConfigurationDefaultValue( + "min_insertion_compaction_unseq_file_size_in_byte")))); + configModified |= + minInsertionCompactionUnseqFileSizeInByte + != conf.getMinInsertionCompactionUnseqFileSizeInByte(); + // update inner_compaction_task_selection_disk_redundancy double innerCompactionTaskSelectionDiskRedundancy = conf.getInnerCompactionTaskSelectionDiskRedundancy(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index 92cccbfb24cf..874b58bca647 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -433,23 +433,26 @@ private InsertionCrossCompactionTaskResource executeInsertionCrossSpaceCompactio if (unseqFiles.isEmpty()) { return result; } - if (seqFiles.isEmpty()) { - result.toInsertUnSeqFile = unseqFiles.get(0).resource; - // ensure the target position is the head of seq space - result.targetFileTimestamp = - Math.min(System.currentTimeMillis(), getTimestampInFileName(unseqFiles.get(0))); - } else { - for (int i = 0; i < unseqFiles.size(); i++) { - TsFileResourceCandidate unseqFile = unseqFiles.get(i); - // skip unseq file which is overlapped with files in seq space or overlapped with previous - // unseq files - if (!isValidInsertionCompactionCandidate(unseqFiles, i)) { - continue; - } - result = selectCurrentUnSeqFile(unseqFile); - if (result.isValid()) { - break; - } + for (int i = 0; i < unseqFiles.size(); i++) { + TsFileResourceCandidate unseqFile = unseqFiles.get(i); + if (!unseqFileLargeEnough(unseqFile)) { + continue; + } + // skip unseq file which is overlapped with files in seq space or overlapped with previous + // unseq files + if (!isValidInsertionCompactionCandidate(unseqFiles, i)) { + continue; + } + if (seqFiles.isEmpty()) { + result.toInsertUnSeqFile = unseqFiles.get(0).resource; + // ensure the target position is the head of seq space + result.targetFileTimestamp = + Math.min(System.currentTimeMillis(), getTimestampInFileName(unseqFiles.get(0))); + break; + } + result = selectCurrentUnSeqFile(unseqFile); + if (result.isValid()) { + break; } } // select the first unseq file to exclude other CrossSpaceCompactionTask @@ -459,6 +462,13 @@ private InsertionCrossCompactionTaskResource executeInsertionCrossSpaceCompactio return result; } + private boolean unseqFileLargeEnough(TsFileResourceCandidate unseqFile) { + return unseqFile.resource.getTsFileID().getInnerCompactionCount() + >= config.getMinCrossCompactionUnseqFileLevel() + || unseqFile.resource.getTsFileSize() + >= config.getMinInsertionCompactionUnseqFileSizeInByte(); + } + private InsertionCrossCompactionTaskResource selectCurrentUnSeqFile( TsFileResourceCandidate unseqFile) throws IOException { int previousSeqFileIndex = 0; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionRecoverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionRecoverTest.java index b89aef630bf1..775236e2f844 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionRecoverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionRecoverTest.java @@ -20,6 +20,8 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.MergeException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; @@ -63,14 +65,22 @@ import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.writeNonAlignedChunk; public class InsertionCrossSpaceCompactionRecoverTest extends AbstractCompactionTest { + + private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private long minUnseqFileSizeForInsertionCompactionTask; + @Before public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { + minUnseqFileSizeForInsertionCompactionTask = + config.getMinInsertionCompactionUnseqFileSizeInByte(); + config.setMinInsertionCompactionUnseqFileSizeInByte(0); super.setUp(); } @After public void tearDown() throws IOException, StorageEngineException { + config.setMinInsertionCompactionUnseqFileSizeInByte(minUnseqFileSizeForInsertionCompactionTask); super.tearDown(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java index 6a86b3955f93..b4a02f05d311 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionSelectorTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.cross; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.MergeException; import org.apache.iotdb.db.exception.StorageEngineException; @@ -59,16 +60,24 @@ public class InsertionCrossSpaceCompactionSelectorTest extends AbstractCompactionTest { + private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private boolean enableCrossCompaction; + private long minInsertionFileSize; + @Before public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { - IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); + enableCrossCompaction = config.isEnableCrossSpaceCompaction(); + minInsertionFileSize = config.getMinInsertionCompactionUnseqFileSizeInByte(); + config.setEnableCrossSpaceCompaction(true); + config.setMinInsertionCompactionUnseqFileSizeInByte(0); super.setUp(); } @After public void tearDown() throws IOException, StorageEngineException { - IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false); + config.setEnableCrossSpaceCompaction(enableCrossCompaction); + config.setMinInsertionCompactionUnseqFileSizeInByte(minInsertionFileSize); super.tearDown(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java index ddcb90f3ac8a..252b3876c003 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/InsertionCrossSpaceCompactionTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.cross; import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.service.metrics.FileMetrics; @@ -63,23 +64,26 @@ public class InsertionCrossSpaceCompactionTest extends AbstractCompactionTest { new FixedPriorityBlockingQueue<>(50, new DefaultCompactionTaskComparatorImpl()); private final CompactionWorker worker = new CompactionWorker(0, candidateCompactionTaskQueue); + private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private boolean enableInsertionCrossSpaceCompaction; + private long minUnseqFileSizeForInsertionCompactionTask; @Before public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { super.setUp(); - enableInsertionCrossSpaceCompaction = - IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction(); - IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true); + enableInsertionCrossSpaceCompaction = config.isEnableCrossSpaceCompaction(); + minUnseqFileSizeForInsertionCompactionTask = + config.getMinInsertionCompactionUnseqFileSizeInByte(); + config.setEnableCrossSpaceCompaction(true); + config.setMinInsertionCompactionUnseqFileSizeInByte(0); TsFileResourceManager.getInstance().clear(); } @After public void tearDown() throws IOException, StorageEngineException { - IoTDBDescriptor.getInstance() - .getConfig() - .setEnableCrossSpaceCompaction(enableInsertionCrossSpaceCompaction); + config.setEnableCrossSpaceCompaction(enableInsertionCrossSpaceCompaction); + config.setMinInsertionCompactionUnseqFileSizeInByte(minUnseqFileSizeForInsertionCompactionTask); super.tearDown(); TsFileResourceManager.getInstance().clear(); } @@ -310,6 +314,27 @@ public void test5() throws IOException, InterruptedException { TsFileResourceManager.getInstance().getPriorityQueueSize()); } + @Test + public void test6() throws IOException, InterruptedException { + TsFileResource unseqResource1 = + generateSingleNonAlignedSeriesFileWithDevices( + "2-2-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(1, 4)}, false); + unseqResource1.setStatusForTest(TsFileResourceStatus.NORMAL); + config.setMinInsertionCompactionUnseqFileSizeInByte(unseqResource1.getTsFileSize() + 1); + + RewriteCrossSpaceCompactionSelector selector = + new RewriteCrossSpaceCompactionSelector( + COMPACTION_TEST_SG, "0", 0, tsFileManager, new CompactionScheduleContext()); + unseqResources.add(unseqResource1); + tsFileManager.addAll(seqResources, true); + tsFileManager.addAll(unseqResources, false); + List tasks = + selector.selectInsertionCrossSpaceTask( + tsFileManager.getOrCreateSequenceListByTimePartition(0), + tsFileManager.getOrCreateUnsequenceListByTimePartition(0)); + Assert.assertEquals(0, tasks.size()); + } + @Test public void testInsertionCompactionSchedule() throws IOException, InterruptedException { TsFileResource seqResource1 = diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index cdcd5a8571a4..c1421bf2c18a 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1301,6 +1301,11 @@ max_cross_compaction_candidate_file_size=5368709120 # Datatype: int min_cross_compaction_unseq_file_level=1 +# The min size of candidate unseq file in one insertion compaction task +# effectiveMode: hot_reload +# Datatype: long +min_insertion_compaction_unseq_file_size_in_byte=104857600 + # How many threads will be set up to perform compaction, 10 by default. # Set to 1 when less than or equal to 0. # effectiveMode: hot_reload