Skip to content

Commit

Permalink
Extend the merge rollup task capabilities (#14355)
Browse files Browse the repository at this point in the history
  • Loading branch information
davecromberge authored Dec 3, 2024
1 parent e1de8d3 commit ed1de92
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ public static class MergeRollupTask extends MergeTask {

// Custom segment group manager class name
public static final String SEGMENT_GROUP_MANAGER_CLASS_NAME_KEY = "segment.group.manager.class.name";

public static final String ERASE_DIMENSION_VALUES_KEY = "eraseDimensionValues";
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfi
segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment()) : segmentNumRowProvider;
}

private static List<RecordReaderFileConfig> convertRecordReadersToRecordReaderFileConfig(
public static List<RecordReaderFileConfig> convertRecordReadersToRecordReaderFileConfig(
List<RecordReader> recordReaders) {
Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is provided");
List<RecordReaderFileConfig> recordReaderFileConfigs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.mergerollup;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* The {@code DimensionValueTransformer} class will transform certain dimension values by substituting the
* existing value for that dimension with the 'defaultNullValue' from its 'fieldSpec'.
*/
public class DimensionValueTransformer implements RecordTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(DimensionValueTransformer.class);

private final Map<String, Object> _defaultNullValues = new HashMap<>();
private final Set<String> _dimensionsToErase;

public DimensionValueTransformer(Schema schema, Set<String> dimensionsToErase) {
_dimensionsToErase = dimensionsToErase;
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
String fieldName = fieldSpec.getName();
Object defaultNullValue = fieldSpec.getDefaultNullValue();
if (fieldSpec.isSingleValueField()) {
_defaultNullValues.put(fieldName, defaultNullValue);
} else {
_defaultNullValues.put(fieldName, new Object[]{defaultNullValue});
}
}

for (String key : dimensionsToErase) {
if (!_defaultNullValues.containsKey(key)) {
LOGGER.warn("Dimension name: {} does not exist in schema and will be ignored.", key);
}
}
}

@Override
public boolean isNoOp() {
return _dimensionsToErase.isEmpty();
}

@Override
public GenericRow transform(GenericRow record) {
for (String dimensionName : _dimensionsToErase) {
Object defaultNullValue = _defaultNullValues.get(dimensionName);
record.putDefaultNullValue(dimensionName, defaultNullValue);
}
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.core.common.MinionConstants;
Expand All @@ -33,6 +34,7 @@
import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -66,6 +68,12 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
TableConfig tableConfig = getTableConfig(tableNameWithType);
Schema schema = getSchema(tableNameWithType);

Set<String> dimensionsToErase = MergeRollupTaskUtils.getDimensionsToErase(configs);
List<RecordTransformer> customRecordTransformers = new ArrayList<>();
if (!dimensionsToErase.isEmpty()) {
customRecordTransformers.add(new DimensionValueTransformer(schema, dimensionsToErase));
}

SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);

Expand Down Expand Up @@ -104,7 +112,9 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
List<File> outputSegmentDirs;
try {
_eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments");
outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process();
outputSegmentDirs = new SegmentProcessorFramework(segmentProcessorConfig, workingDir,
SegmentProcessorFramework.convertRecordReadersToRecordReaderFileConfig(recordReaders),
customRecordTransformers, null).process();
} finally {
for (RecordReader recordReader : recordReaders) {
recordReader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
*/
package org.apache.pinot.plugin.minion.tasks.mergerollup;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.common.MinionConstants.MergeTask;


Expand All @@ -35,7 +40,8 @@ private MergeRollupTaskUtils() {
MergeTask.MERGE_TYPE_KEY,
MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY,
MergeTask.MAX_NUM_PARALLEL_BUCKETS
MergeTask.MAX_NUM_PARALLEL_BUCKETS,
MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY,
};
//@formatter:on

Expand All @@ -56,4 +62,17 @@ public static Map<String, Map<String, String>> getLevelToConfigMap(Map<String, S
}
return levelToConfigMap;
}

/**
* Extracts an array of dimensions to reduce/erase from the task config.
* <p>The config for the dimensions to erase should be a comma-separated string value.
*/
public static Set<String> getDimensionsToErase(Map<String, String> taskConfig) {
if (taskConfig == null || taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY) == null) {
return new HashSet<>();
}
return Arrays.stream(taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY).split(","))
.map(String::trim)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.minion.tasks.mergerollup;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;

import static org.testng.Assert.*;


public class DimensionValueTransformerTest {
private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension("svInt", DataType.INT)
.addSingleValueDimension("svLong", DataType.LONG).addSingleValueDimension("svFloat", DataType.FLOAT)
.addSingleValueDimension("svDouble", DataType.DOUBLE).addSingleValueDimension("svBoolean", DataType.BOOLEAN)
.addSingleValueDimension("svTimestamp", DataType.TIMESTAMP).addSingleValueDimension("svBytes", DataType.BYTES)
.addMultiValueDimension("mvInt", DataType.INT).addSingleValueDimension("svJson", DataType.JSON)
.addMultiValueDimension("mvLong", DataType.LONG).addMultiValueDimension("mvFloat", DataType.FLOAT)
.addMultiValueDimension("mvDouble", DataType.DOUBLE)
.addSingleValueDimension("svStringWithNullCharacters", DataType.STRING)
.addSingleValueDimension("svStringWithLengthLimit", DataType.STRING)
.addMultiValueDimension("mvString1", DataType.STRING).build();

private static final TableConfig TABLE_CONFIG =
new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();

static {
SCHEMA.getFieldSpecFor("svStringWithLengthLimit").setMaxLength(2);
SCHEMA.addField(new DimensionFieldSpec("$virtual", DataType.STRING, true, Object.class));
}

// Transform multiple times should return the same result
private static final int NUM_ROUNDS = 5;

private static GenericRow getRecord() {
GenericRow record = new GenericRow();
record.putValue("svInt", (byte) 123);
record.putValue("svLong", (char) 123);
record.putValue("svFloat", Collections.singletonList((short) 123));
record.putValue("svDouble", new String[]{"123"});
record.putValue("svBoolean", "true");
record.putValue("svTimestamp", "2020-02-02 22:22:22.222");
record.putValue("svBytes", "7b7b"/*new byte[]{123, 123}*/);
record.putValue("svJson", "{\"first\": \"daffy\", \"last\": \"duck\"}");
record.putValue("mvInt", new Object[]{123L});
record.putValue("mvLong", Collections.singletonList(123f));
record.putValue("mvFloat", new Double[]{123d});
record.putValue("mvDouble", Collections.singletonMap("key", 123));
record.putValue("mvString1", new Object[]{"123", 123, 123L, 123f, 123.0});
record.putValue("svFloatNegativeZero", -0.00f);
return record;
}

@Test
public void testDimensionValueTransformer() {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY,
"svInt, svLong, svFloat, svDouble, svBoolean, svTimestamp, svJson, svBytes, mvInt, mvLong, mvFloat, mvDouble,"
+ " mvString1, $virtual");
Set<String> dimensionsToErase = MergeRollupTaskUtils.getDimensionsToErase(taskConfig);
RecordTransformer transformer = new DimensionValueTransformer(SCHEMA, dimensionsToErase);

GenericRow record = getRecord();
for (int i = 0; i < NUM_ROUNDS; i++) {
record = transformer.transform(record);
assertNotNull(record);
assertEquals(record.getValue("svInt"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT);
assertEquals(record.getValue("svLong"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG);
assertEquals(record.getValue("svFloat"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT);
assertEquals(record.getValue("svDouble"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE);
assertEquals(record.getValue("svBoolean"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BOOLEAN);
assertEquals(record.getValue("svTimestamp"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_TIMESTAMP);
assertEquals(record.getValue("svBytes"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES);
assertEquals(record.getValue("svJson"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_JSON);
assertEquals(record.getValue("mvInt"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT});
assertEquals(record.getValue("mvLong"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG});
assertEquals(record.getValue("mvFloat"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT});
assertEquals(record.getValue("mvDouble"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE});
assertEquals(record.getValue("mvString1"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING});
}

// Test empty record
record = new GenericRow();
for (int i = 0; i < NUM_ROUNDS; i++) {
record = transformer.transform(record);
assertNotNull(record);
assertEquals(record.getValue("svInt"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT);
assertEquals(record.getValue("svLong"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG);
assertEquals(record.getValue("svFloat"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT);
assertEquals(record.getValue("svDouble"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE);
assertEquals(record.getValue("svBoolean"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BOOLEAN);
assertEquals(record.getValue("svTimestamp"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_TIMESTAMP);
assertEquals(record.getValue("svBytes"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES);
assertEquals(record.getValue("svJson"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_JSON);
assertEquals(record.getValue("mvInt"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT});
assertEquals(record.getValue("mvLong"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG});
assertEquals(record.getValue("mvFloat"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT});
assertEquals(record.getValue("mvDouble"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE});
assertEquals(record.getValue("mvString1"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -123,6 +124,31 @@ public void testConvert()
Assert.assertEquals(segmentMetadata.getTotalDocs(), NUM_SEGMENTS * NUM_ROWS);
}

@Test
public void testDimensionErasure()
throws Exception {
MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor(new MinionConf());
mergeRollupTaskExecutor.setMinionEventObserver(new MinionProgressObserver());
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily");
configs.put(MinionConstants.MergeTask.MERGE_TYPE_KEY, "rollup");
configs.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, D1);

PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
List<SegmentConversionResult> conversionResults =
mergeRollupTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR);

Assert.assertEquals(conversionResults.size(), 1);
Assert.assertEquals(conversionResults.get(0).getSegmentName(), MERGED_SEGMENT_NAME);
File mergedSegment = conversionResults.get(0).getFile();
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(mergedSegment);
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(D1);
Assert.assertEquals(segmentMetadata.getTotalDocs(), 1);
Assert.assertEquals(columnMetadata.getMinValue(), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT);
Assert.assertEquals(columnMetadata.getMaxValue(), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT);
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down
Loading

0 comments on commit ed1de92

Please sign in to comment.