Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add min file size and level limit for insertion compaction #14345

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ public class IoTDBConfig {
*/
private volatile int minCrossCompactionUnseqFileLevel = 1;

/** The min size of candidate unseq file in one insertion compaction task */
private volatile long minInsertionCompactionUnseqFileSizeInByte = 100 * 1024 * 1024;

/** The interval of compaction task schedulation in each virtual database. The unit is ms. */
private long compactionScheduleIntervalInMs = 60_000L;

Expand Down Expand Up @@ -3113,6 +3116,15 @@ public void setMinCrossCompactionUnseqFileLevel(int minCrossCompactionUnseqFileL
this.minCrossCompactionUnseqFileLevel = minCrossCompactionUnseqFileLevel;
}

public long getMinInsertionCompactionUnseqFileSizeInByte() {
return minInsertionCompactionUnseqFileSizeInByte;
}

public void setMinInsertionCompactionUnseqFileSizeInByte(
long minInsertionCompactionUnseqFileSizeInByte) {
this.minInsertionCompactionUnseqFileSizeInByte = minInsertionCompactionUnseqFileSizeInByte;
}

public int getSubCompactionTaskNum() {
return subCompactionTaskNum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,14 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
Integer.toString(conf.getMinCrossCompactionUnseqFileLevel())))
.map(String::trim)
.orElse(Integer.toString(conf.getMinCrossCompactionUnseqFileLevel()))));
conf.setMinInsertionCompactionUnseqFileSizeInByte(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty(
"min_insertion_compaction_unseq_file_size_in_byte",
Long.toString(conf.getMinInsertionCompactionUnseqFileSizeInByte())))
.map(String::trim)
.orElse(Long.toString(conf.getMinInsertionCompactionUnseqFileSizeInByte()))));

conf.setCompactionWriteThroughputMbPerSec(
Integer.parseInt(
Expand Down Expand Up @@ -1879,6 +1887,24 @@ private boolean loadCompactionTaskHotModifiedProps(Properties properties) throws
configModified |=
minCrossCompactionCandidateFileNum != conf.getMinCrossCompactionUnseqFileLevel();

// update min_insertion_compaction_unseq_file_size_in_byte
long minInsertionCompactionUnseqFileSizeInByte =
conf.getMinInsertionCompactionUnseqFileSizeInByte();
conf.setMinInsertionCompactionUnseqFileSizeInByte(
Long.parseLong(
Optional.ofNullable(
properties.getProperty(
"min_insertion_compaction_unseq_file_size_in_byte",
ConfigurationFileUtils.getConfigurationDefaultValue(
"min_insertion_compaction_unseq_file_size_in_byte")))
.map(String::trim)
.orElse(
ConfigurationFileUtils.getConfigurationDefaultValue(
"min_insertion_compaction_unseq_file_size_in_byte"))));
configModified |=
minInsertionCompactionUnseqFileSizeInByte
!= conf.getMinInsertionCompactionUnseqFileSizeInByte();

// update inner_compaction_task_selection_disk_redundancy
double innerCompactionTaskSelectionDiskRedundancy =
conf.getInnerCompactionTaskSelectionDiskRedundancy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,23 +433,26 @@ private InsertionCrossCompactionTaskResource executeInsertionCrossSpaceCompactio
if (unseqFiles.isEmpty()) {
return result;
}
if (seqFiles.isEmpty()) {
result.toInsertUnSeqFile = unseqFiles.get(0).resource;
// ensure the target position is the head of seq space
result.targetFileTimestamp =
Math.min(System.currentTimeMillis(), getTimestampInFileName(unseqFiles.get(0)));
} else {
for (int i = 0; i < unseqFiles.size(); i++) {
TsFileResourceCandidate unseqFile = unseqFiles.get(i);
// skip unseq file which is overlapped with files in seq space or overlapped with previous
// unseq files
if (!isValidInsertionCompactionCandidate(unseqFiles, i)) {
continue;
}
result = selectCurrentUnSeqFile(unseqFile);
if (result.isValid()) {
break;
}
for (int i = 0; i < unseqFiles.size(); i++) {
TsFileResourceCandidate unseqFile = unseqFiles.get(i);
if (!unseqFileLargeEnough(unseqFile)) {
continue;
}
// skip unseq file which is overlapped with files in seq space or overlapped with previous
// unseq files
if (!isValidInsertionCompactionCandidate(unseqFiles, i)) {
continue;
}
if (seqFiles.isEmpty()) {
result.toInsertUnSeqFile = unseqFiles.get(0).resource;
// ensure the target position is the head of seq space
result.targetFileTimestamp =
Math.min(System.currentTimeMillis(), getTimestampInFileName(unseqFiles.get(0)));
break;
}
result = selectCurrentUnSeqFile(unseqFile);
if (result.isValid()) {
break;
}
}
// select the first unseq file to exclude other CrossSpaceCompactionTask
Expand All @@ -459,6 +462,13 @@ private InsertionCrossCompactionTaskResource executeInsertionCrossSpaceCompactio
return result;
}

private boolean unseqFileLargeEnough(TsFileResourceCandidate unseqFile) {
return unseqFile.resource.getTsFileID().getInnerCompactionCount()
>= config.getMinCrossCompactionUnseqFileLevel()
|| unseqFile.resource.getTsFileSize()
>= config.getMinInsertionCompactionUnseqFileSizeInByte();
}

private InsertionCrossCompactionTaskResource selectCurrentUnSeqFile(
TsFileResourceCandidate unseqFile) throws IOException {
int previousSeqFileIndex = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
Expand Down Expand Up @@ -63,14 +65,22 @@
import static org.apache.iotdb.db.storageengine.dataregion.compaction.utils.TsFileGeneratorUtils.writeNonAlignedChunk;

public class InsertionCrossSpaceCompactionRecoverTest extends AbstractCompactionTest {

private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private long minUnseqFileSizeForInsertionCompactionTask;

@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException, InterruptedException {
minUnseqFileSizeForInsertionCompactionTask =
config.getMinInsertionCompactionUnseqFileSizeInByte();
config.setMinInsertionCompactionUnseqFileSizeInByte(0);
super.setUp();
}

@After
public void tearDown() throws IOException, StorageEngineException {
config.setMinInsertionCompactionUnseqFileSizeInByte(minUnseqFileSizeForInsertionCompactionTask);
super.tearDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;

import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.StorageEngineException;
Expand Down Expand Up @@ -59,16 +60,24 @@

public class InsertionCrossSpaceCompactionSelectorTest extends AbstractCompactionTest {

private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private boolean enableCrossCompaction;
private long minInsertionFileSize;

@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException, InterruptedException {
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
enableCrossCompaction = config.isEnableCrossSpaceCompaction();
minInsertionFileSize = config.getMinInsertionCompactionUnseqFileSizeInByte();
config.setEnableCrossSpaceCompaction(true);
config.setMinInsertionCompactionUnseqFileSizeInByte(0);
super.setUp();
}

@After
public void tearDown() throws IOException, StorageEngineException {
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(false);
config.setEnableCrossSpaceCompaction(enableCrossCompaction);
config.setMinInsertionCompactionUnseqFileSizeInByte(minInsertionFileSize);
super.tearDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.cross;

import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.service.metrics.FileMetrics;
Expand Down Expand Up @@ -63,23 +64,26 @@ public class InsertionCrossSpaceCompactionTest extends AbstractCompactionTest {
new FixedPriorityBlockingQueue<>(50, new DefaultCompactionTaskComparatorImpl());
private final CompactionWorker worker = new CompactionWorker(0, candidateCompactionTaskQueue);

private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private boolean enableInsertionCrossSpaceCompaction;
private long minUnseqFileSizeForInsertionCompactionTask;

@Before
public void setUp()
throws IOException, WriteProcessException, MetadataException, InterruptedException {
super.setUp();
enableInsertionCrossSpaceCompaction =
IoTDBDescriptor.getInstance().getConfig().isEnableCrossSpaceCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableCrossSpaceCompaction(true);
enableInsertionCrossSpaceCompaction = config.isEnableCrossSpaceCompaction();
minUnseqFileSizeForInsertionCompactionTask =
config.getMinInsertionCompactionUnseqFileSizeInByte();
config.setEnableCrossSpaceCompaction(true);
config.setMinInsertionCompactionUnseqFileSizeInByte(0);
TsFileResourceManager.getInstance().clear();
}

@After
public void tearDown() throws IOException, StorageEngineException {
IoTDBDescriptor.getInstance()
.getConfig()
.setEnableCrossSpaceCompaction(enableInsertionCrossSpaceCompaction);
config.setEnableCrossSpaceCompaction(enableInsertionCrossSpaceCompaction);
config.setMinInsertionCompactionUnseqFileSizeInByte(minUnseqFileSizeForInsertionCompactionTask);
super.tearDown();
TsFileResourceManager.getInstance().clear();
}
Expand Down Expand Up @@ -310,6 +314,27 @@ public void test5() throws IOException, InterruptedException {
TsFileResourceManager.getInstance().getPriorityQueueSize());
}

@Test
public void test6() throws IOException, InterruptedException {
TsFileResource unseqResource1 =
generateSingleNonAlignedSeriesFileWithDevices(
"2-2-0-0.tsfile", new String[] {"d1"}, new TimeRange[] {new TimeRange(1, 4)}, false);
unseqResource1.setStatusForTest(TsFileResourceStatus.NORMAL);
config.setMinInsertionCompactionUnseqFileSizeInByte(unseqResource1.getTsFileSize() + 1);

RewriteCrossSpaceCompactionSelector selector =
new RewriteCrossSpaceCompactionSelector(
COMPACTION_TEST_SG, "0", 0, tsFileManager, new CompactionScheduleContext());
unseqResources.add(unseqResource1);
tsFileManager.addAll(seqResources, true);
tsFileManager.addAll(unseqResources, false);
List<CrossCompactionTaskResource> tasks =
selector.selectInsertionCrossSpaceTask(
tsFileManager.getOrCreateSequenceListByTimePartition(0),
tsFileManager.getOrCreateUnsequenceListByTimePartition(0));
Assert.assertEquals(0, tasks.size());
}

@Test
public void testInsertionCompactionSchedule() throws IOException, InterruptedException {
TsFileResource seqResource1 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.junit.Test;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -99,6 +100,15 @@ public void tearDown() throws IOException, StorageEngineException {
super.tearDown();
}

@Test
public void test11() throws IOException {
TsFileResource resource =
new TsFileResource(
new File(
"~/Downloads/data_2/datanode/data/sequence/root.d1/1/0/1733195872759-1-0-0.tsfile"));
resource.deserialize();
}

jt2594838 marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testRepairUnsortedDataBetweenPageWithNonAlignedSeries() throws IOException {
TsFileResource resource = createEmptyFileAndResource(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1301,6 +1301,11 @@ max_cross_compaction_candidate_file_size=5368709120
# Datatype: int
min_cross_compaction_unseq_file_level=1

# The min size of candidate unseq file in one insertion compaction task
# effectiveMode: hot_reload
# Datatype: long
min_insertion_compaction_unseq_file_size_in_byte=104857600

# How many threads will be set up to perform compaction, 10 by default.
# Set to 1 when less than or equal to 0.
# effectiveMode: hot_reload
Expand Down
Loading