diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 26e0bd79edc9..3c4b9b322f01 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -134,6 +134,8 @@ public static class MergeRollupTask extends MergeTask { // Custom segment group manager class name public static final String SEGMENT_GROUP_MANAGER_CLASS_NAME_KEY = "segment.group.manager.class.name"; + + public static final String ERASE_DIMENSION_VALUES_KEY = "eraseDimensionValues"; } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java index 4c9081a1ee07..4923d5be94b3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java @@ -124,7 +124,7 @@ protected SegmentProcessorFramework(SegmentProcessorConfig segmentProcessorConfi segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment()) : segmentNumRowProvider; } - private static List convertRecordReadersToRecordReaderFileConfig( + public static List convertRecordReadersToRecordReaderFileConfig( List recordReaders) { Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is provided"); List recordReaderFileConfigs = new ArrayList<>(); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformer.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformer.java new file mode 100644 index 000000000000..4ec70ff43d47 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformer.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.mergerollup; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The {@code DimensionValueTransformer} class will transform certain dimension values by substituting the + * existing value for that dimension with the 'defaultNullValue' from its 'fieldSpec'. + */ +public class DimensionValueTransformer implements RecordTransformer { + private static final Logger LOGGER = LoggerFactory.getLogger(DimensionValueTransformer.class); + + private final Map _defaultNullValues = new HashMap<>(); + private final Set _dimensionsToErase; + + public DimensionValueTransformer(Schema schema, Set dimensionsToErase) { + _dimensionsToErase = dimensionsToErase; + for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) { + String fieldName = fieldSpec.getName(); + Object defaultNullValue = fieldSpec.getDefaultNullValue(); + if (fieldSpec.isSingleValueField()) { + _defaultNullValues.put(fieldName, defaultNullValue); + } else { + _defaultNullValues.put(fieldName, new Object[]{defaultNullValue}); + } + } + + for (String key : dimensionsToErase) { + if (!_defaultNullValues.containsKey(key)) { + LOGGER.warn("Dimension name: {} does not exist in schema and will be ignored.", key); + } + } + } + + @Override + public boolean isNoOp() { + return _dimensionsToErase.isEmpty(); + } + + @Override + public GenericRow transform(GenericRow record) { + for (String dimensionName : _dimensionsToErase) { + Object defaultNullValue = _defaultNullValues.get(dimensionName); + record.putDefaultNullValue(dimensionName, defaultNullValue); + } + return record; + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java index 049859c1a996..ce56192f1f31 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.core.common.MinionConstants; @@ -33,6 +34,7 @@ import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor; import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils; import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -66,6 +68,12 @@ protected List convert(PinotTaskConfig pinotTaskConfig, TableConfig tableConfig = getTableConfig(tableNameWithType); Schema schema = getSchema(tableNameWithType); + Set dimensionsToErase = MergeRollupTaskUtils.getDimensionsToErase(configs); + List customRecordTransformers = new ArrayList<>(); + if (!dimensionsToErase.isEmpty()) { + customRecordTransformers.add(new DimensionValueTransformer(schema, dimensionsToErase)); + } + SegmentProcessorConfig.Builder segmentProcessorConfigBuilder = new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema); @@ -104,7 +112,9 @@ protected List convert(PinotTaskConfig pinotTaskConfig, List outputSegmentDirs; try { _eventObserver.notifyProgress(_pinotTaskConfig, "Generating segments"); - outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process(); + outputSegmentDirs = new SegmentProcessorFramework(segmentProcessorConfig, workingDir, + SegmentProcessorFramework.convertRecordReadersToRecordReaderFileConfig(recordReaders), + customRecordTransformers, null).process(); } finally { for (RecordReader recordReader : recordReaders) { recordReader.close(); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java index 463b2e53bbe3..c3c7720e1b87 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java @@ -18,8 +18,13 @@ */ package org.apache.pinot.plugin.minion.tasks.mergerollup; +import java.util.Arrays; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.MergeTask; @@ -35,7 +40,8 @@ private MergeRollupTaskUtils() { MergeTask.MERGE_TYPE_KEY, MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY, MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY, - MergeTask.MAX_NUM_PARALLEL_BUCKETS + MergeTask.MAX_NUM_PARALLEL_BUCKETS, + MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, }; //@formatter:on @@ -56,4 +62,17 @@ public static Map> getLevelToConfigMap(MapThe config for the dimensions to erase should be a comma-separated string value. + */ + public static Set getDimensionsToErase(Map taskConfig) { + if (taskConfig == null || taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY) == null) { + return new HashSet<>(); + } + return Arrays.stream(taskConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY).split(",")) + .map(String::trim) + .collect(Collectors.toSet()); + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformerTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformerTest.java new file mode 100644 index 000000000000..43b72c271f16 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/DimensionValueTransformerTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks.mergerollup; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.segment.local.recordtransformer.RecordTransformer; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class DimensionValueTransformerTest { + private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension("svInt", DataType.INT) + .addSingleValueDimension("svLong", DataType.LONG).addSingleValueDimension("svFloat", DataType.FLOAT) + .addSingleValueDimension("svDouble", DataType.DOUBLE).addSingleValueDimension("svBoolean", DataType.BOOLEAN) + .addSingleValueDimension("svTimestamp", DataType.TIMESTAMP).addSingleValueDimension("svBytes", DataType.BYTES) + .addMultiValueDimension("mvInt", DataType.INT).addSingleValueDimension("svJson", DataType.JSON) + .addMultiValueDimension("mvLong", DataType.LONG).addMultiValueDimension("mvFloat", DataType.FLOAT) + .addMultiValueDimension("mvDouble", DataType.DOUBLE) + .addSingleValueDimension("svStringWithNullCharacters", DataType.STRING) + .addSingleValueDimension("svStringWithLengthLimit", DataType.STRING) + .addMultiValueDimension("mvString1", DataType.STRING).build(); + + private static final TableConfig TABLE_CONFIG = + new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); + + static { + SCHEMA.getFieldSpecFor("svStringWithLengthLimit").setMaxLength(2); + SCHEMA.addField(new DimensionFieldSpec("$virtual", DataType.STRING, true, Object.class)); + } + + // Transform multiple times should return the same result + private static final int NUM_ROUNDS = 5; + + private static GenericRow getRecord() { + GenericRow record = new GenericRow(); + record.putValue("svInt", (byte) 123); + record.putValue("svLong", (char) 123); + record.putValue("svFloat", Collections.singletonList((short) 123)); + record.putValue("svDouble", new String[]{"123"}); + record.putValue("svBoolean", "true"); + record.putValue("svTimestamp", "2020-02-02 22:22:22.222"); + record.putValue("svBytes", "7b7b"/*new byte[]{123, 123}*/); + record.putValue("svJson", "{\"first\": \"daffy\", \"last\": \"duck\"}"); + record.putValue("mvInt", new Object[]{123L}); + record.putValue("mvLong", Collections.singletonList(123f)); + record.putValue("mvFloat", new Double[]{123d}); + record.putValue("mvDouble", Collections.singletonMap("key", 123)); + record.putValue("mvString1", new Object[]{"123", 123, 123L, 123f, 123.0}); + record.putValue("svFloatNegativeZero", -0.00f); + return record; + } + + @Test + public void testDimensionValueTransformer() { + Map taskConfig = new HashMap<>(); + taskConfig.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, + "svInt, svLong, svFloat, svDouble, svBoolean, svTimestamp, svJson, svBytes, mvInt, mvLong, mvFloat, mvDouble," + + " mvString1, $virtual"); + Set dimensionsToErase = MergeRollupTaskUtils.getDimensionsToErase(taskConfig); + RecordTransformer transformer = new DimensionValueTransformer(SCHEMA, dimensionsToErase); + + GenericRow record = getRecord(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svInt"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT); + assertEquals(record.getValue("svLong"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG); + assertEquals(record.getValue("svFloat"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT); + assertEquals(record.getValue("svDouble"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE); + assertEquals(record.getValue("svBoolean"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BOOLEAN); + assertEquals(record.getValue("svTimestamp"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_TIMESTAMP); + assertEquals(record.getValue("svBytes"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES); + assertEquals(record.getValue("svJson"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_JSON); + assertEquals(record.getValue("mvInt"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT}); + assertEquals(record.getValue("mvLong"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG}); + assertEquals(record.getValue("mvFloat"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT}); + assertEquals(record.getValue("mvDouble"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE}); + assertEquals(record.getValue("mvString1"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING}); + } + + // Test empty record + record = new GenericRow(); + for (int i = 0; i < NUM_ROUNDS; i++) { + record = transformer.transform(record); + assertNotNull(record); + assertEquals(record.getValue("svInt"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT); + assertEquals(record.getValue("svLong"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG); + assertEquals(record.getValue("svFloat"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT); + assertEquals(record.getValue("svDouble"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE); + assertEquals(record.getValue("svBoolean"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BOOLEAN); + assertEquals(record.getValue("svTimestamp"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_TIMESTAMP); + assertEquals(record.getValue("svBytes"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES); + assertEquals(record.getValue("svJson"), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_JSON); + assertEquals(record.getValue("mvInt"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT}); + assertEquals(record.getValue("mvLong"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_LONG}); + assertEquals(record.getValue("mvFloat"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_FLOAT}); + assertEquals(record.getValue("mvDouble"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_DOUBLE}); + assertEquals(record.getValue("mvString1"), new Object[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING}); + } + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java index 03d4244116e4..901160f69ece 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutorTest.java @@ -37,6 +37,7 @@ import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.TableConfig; @@ -123,6 +124,31 @@ public void testConvert() Assert.assertEquals(segmentMetadata.getTotalDocs(), NUM_SEGMENTS * NUM_ROWS); } + @Test + public void testDimensionErasure() + throws Exception { + MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor(new MinionConf()); + mergeRollupTaskExecutor.setMinionEventObserver(new MinionProgressObserver()); + Map configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE"); + configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily"); + configs.put(MinionConstants.MergeTask.MERGE_TYPE_KEY, "rollup"); + configs.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, D1); + + PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs); + List conversionResults = + mergeRollupTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR); + + Assert.assertEquals(conversionResults.size(), 1); + Assert.assertEquals(conversionResults.get(0).getSegmentName(), MERGED_SEGMENT_NAME); + File mergedSegment = conversionResults.get(0).getFile(); + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(mergedSegment); + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(D1); + Assert.assertEquals(segmentMetadata.getTotalDocs(), 1); + Assert.assertEquals(columnMetadata.getMinValue(), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT); + Assert.assertEquals(columnMetadata.getMaxValue(), FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_INT); + } + @AfterClass public void tearDown() throws Exception { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java index 611598c7e0ac..f73559728ffa 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java @@ -20,11 +20,14 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; +import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.MergeTask; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; public class MergeRollupTaskUtilsTest { @@ -35,10 +38,12 @@ public void testGetLevelToConfigMap() { taskConfig.put("daily.bucketTimePeriod", "1d"); taskConfig.put("daily.bufferTimePeriod", "3d"); taskConfig.put("daily.maxNumRecordsPerSegment", "1000000"); + taskConfig.put("daily.eraseDimensionValues", "a,b"); taskConfig.put("monthly.bucketTimePeriod", "30d"); taskConfig.put("monthly.bufferTimePeriod", "10d"); taskConfig.put("monthly.roundBucketTimePeriod", "7d"); taskConfig.put("monthly.mergeType", "rollup"); + taskConfig.put("monthly.eraseDimensionValues", "a,b,c,d"); taskConfig.put("monthly.maxNumRecordsPerTask", "5000000"); taskConfig.put("monthly.maxNumParallelBuckets", "5"); @@ -47,19 +52,50 @@ public void testGetLevelToConfigMap() { Map dailyConfig = levelToConfigMap.get("daily"); assertNotNull(dailyConfig); - assertEquals(dailyConfig.size(), 3); + assertEquals(dailyConfig.size(), 4); assertEquals(dailyConfig.get(MergeTask.BUCKET_TIME_PERIOD_KEY), "1d"); assertEquals(dailyConfig.get(MergeTask.BUFFER_TIME_PERIOD_KEY), "3d"); assertEquals(dailyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY), "1000000"); + assertEquals(dailyConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY), "a,b"); Map monthlyConfig = levelToConfigMap.get("monthly"); assertNotNull(monthlyConfig); - assertEquals(monthlyConfig.size(), 6); + assertEquals(monthlyConfig.size(), 7); assertEquals(monthlyConfig.get(MergeTask.BUCKET_TIME_PERIOD_KEY), "30d"); assertEquals(monthlyConfig.get(MergeTask.BUFFER_TIME_PERIOD_KEY), "10d"); assertEquals(monthlyConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY), "7d"); assertEquals(monthlyConfig.get(MergeTask.MERGE_TYPE_KEY), "rollup"); assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_RECORDS_PER_TASK_KEY), "5000000"); assertEquals(monthlyConfig.get(MergeTask.MAX_NUM_PARALLEL_BUCKETS), "5"); + assertEquals(monthlyConfig.get(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY), "a,b,c,d"); + } + + @Test + public void testEraseDimensionValuesAbsent() { + Set result1 = MergeRollupTaskUtils.getDimensionsToErase(null); + assertTrue(result1.isEmpty(), "Expected empty set when 'taskConfig' is null"); + Set result2 = MergeRollupTaskUtils.getDimensionsToErase(new HashMap<>()); + assertTrue(result2.isEmpty(), "Expected empty set when 'eraseDimensionValues' is absent"); + } + + @Test + public void testEraseSingleDimensionValue() { + Map taskConfig = new HashMap<>(); + taskConfig.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, "dimension1"); + Set result = MergeRollupTaskUtils.getDimensionsToErase(taskConfig); + assertEquals(result.size(), 1, "Expected one dimension in the result set"); + assertTrue(result.contains("dimension1"), "Expected set to contain 'dimension1'"); + } + + @Test + public void testEraseMultipleDimensionValues() { + Map taskConfig = new HashMap<>(); + taskConfig.put(MinionConstants.MergeRollupTask.ERASE_DIMENSION_VALUES_KEY, + " dimension1 , dimension2 , dimension3 "); + Set result = MergeRollupTaskUtils.getDimensionsToErase(taskConfig); + assertEquals(result.size(), 3, "Expected three dimensions in the result set with whitespace trimmed"); + assertTrue(result.contains("dimension1"), "Expected set to contain 'dimension1'"); + assertTrue(result.contains("dimension2"), "Expected set to contain 'dimension2'"); + assertTrue(result.contains("dimension3"), "Expected set to contain 'dimension3'"); } }