Skip to content

Commit

Permalink
Fill derived column with the default null value on transform function…
Browse files Browse the repository at this point in the history
… errors
  • Loading branch information
yashmayya committed Jul 11, 2024
1 parent d9969ba commit c7a116d
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public class GroovyFunctionEvaluator implements FunctionEvaluator {
private final int _numArguments;
private final Binding _binding;
private final Script _script;
private final String _expression;

public GroovyFunctionEvaluator(String closure) {
_expression = closure;
Matcher matcher = GROOVY_FUNCTION_PATTERN.matcher(closure);
Preconditions.checkState(matcher.matches(), "Invalid transform expression: %s", closure);
String arguments = matcher.group(ARGUMENTS_GROUP_NAME);
Expand Down Expand Up @@ -110,4 +112,9 @@ public Object evaluate(Object[] values) {
}
return _script.run();
}

@Override
public String toString() {
return _expression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public class InbuiltFunctionEvaluator implements FunctionEvaluator {
// Root of the execution tree
private final ExecutableNode _rootNode;
private final List<String> _arguments;
private final String _functionExpression;

public InbuiltFunctionEvaluator(String functionExpression) {
_functionExpression = functionExpression;
_arguments = new ArrayList<>();
_rootNode = planExecution(RequestContextUtils.getExpression(functionExpression));
}
Expand Down Expand Up @@ -118,6 +120,11 @@ public Object evaluate(Object[] values) {
return _rootNode.execute(values);
}

@Override
public String toString() {
return _functionExpression;
}

private interface ExecutableNode {

Object execute(GenericRow row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -393,7 +394,7 @@ protected boolean createColumnV1Indices(String column)
}

try {
createDerivedColumnV1Indices(column, functionEvaluator, argumentsMetadata);
createDerivedColumnV1Indices(column, functionEvaluator, argumentsMetadata, errorOnFailure);
return true;
} catch (Exception e) {
LOGGER.error("Caught exception while creating derived column: {} with transform function: {}", column,
Expand Down Expand Up @@ -556,7 +557,7 @@ private boolean isNullable(FieldSpec fieldSpec) {
* - Support forward index disabled derived column
*/
private void createDerivedColumnV1Indices(String column, FunctionEvaluator functionEvaluator,
List<ColumnMetadata> argumentsMetadata)
List<ColumnMetadata> argumentsMetadata, boolean errorOnFailure)
throws Exception {
// Initialize value readers for all arguments
int numArguments = argumentsMetadata.size();
Expand All @@ -570,6 +571,12 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct
if (isNullable(fieldSpec)) {
nullValueVectorCreator = new NullValueVectorCreator(_indexDir, fieldSpec.getName());
}

// Just log the first function evaluation error
int functionEvaluateErrorCount = 0;
Exception functionEvalError = null;
Object[] inputValuesWithError = null;

try {
// Calculate the values for the derived column
Object[] inputValues = new Object[numArguments];
Expand All @@ -580,7 +587,23 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct
for (int j = 0; j < numArguments; j++) {
inputValues[j] = valueReaders.get(j).getValue(i);
}
Object outputValue = functionEvaluator.evaluate(inputValues);

Object outputValue = null;
try {
outputValue = functionEvaluator.evaluate(inputValues);
} catch (Exception e) {
if (!errorOnFailure) {
LOGGER.debug("Encountered an exception while evaluating function {} for derived column {} with "
+ "arguments: {}", functionEvaluator, column, Arrays.toString(inputValues), e);
functionEvaluateErrorCount++;
if (functionEvalError == null) {
functionEvalError = e;
inputValuesWithError = Arrays.copyOf(inputValues, inputValues.length);
}
} else {
throw e;
}
}

if (outputValue == null) {
outputValue = fieldSpec.getDefaultNullValue();
Expand All @@ -597,6 +620,12 @@ private void createDerivedColumnV1Indices(String column, FunctionEvaluator funct
outputValues[i] = outputValue;
}

if (functionEvaluateErrorCount > 0) {
LOGGER.warn("Caught {} exceptions while evaluating derived column: {} with function: {}. The first input value "
+ "tuple that led to an error is: {}", functionEvaluateErrorCount, column, functionEvaluator,
Arrays.toString(inputValuesWithError), functionEvalError);
}

if (nullValueVectorCreator != null) {
nullValueVectorCreator.seal();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public class SegmentPreProcessorTest {
private static final String NEW_INT_SV_DIMENSION_COLUMN_NAME = "newIntSVDimension";
private static final String NEW_STRING_MV_DIMENSION_COLUMN_NAME = "newStringMVDimension";
private static final String NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME = "newRawStringSVDimension";
private static final String NEW_NULLABLE_STRING_SV_DIMENSION_COLUMN_NAME = "newNullableStringSVDimension";
private static final String NEW_NULL_RETURN_STRING_SV_DIMENSION_COLUMN_NAME = "newNullReturnStringSVDimension";
private static final String NEW_WRONG_ARG_DATE_TRUNC_DERIVED_COLUMN_NAME = "newWrongArgDateTruncDerivedColumn";
private static final String NEW_HLL_BYTE_METRIC_COLUMN_NAME = "newHLLByteMetric";
private static final String NEW_TDIGEST_BYTE_METRIC_COLUMN_NAME = "newTDigestByteMetric";

Expand Down Expand Up @@ -1107,13 +1108,17 @@ public void testV1UpdateDefaultColumns()
ImmutableList.of(
new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)"),
new TransformConfig(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME, "reverse(column3)"),
// Ensure that null values for derived columns are handled appropriately during segment reload
new TransformConfig(NEW_NULLABLE_STRING_SV_DIMENSION_COLUMN_NAME,
"json_path_string(column21, 'non-existent-path', null)")
// Ensure that null values returned by transform functions for derived columns are handled appropriately
// during segment reload
new TransformConfig(NEW_NULL_RETURN_STRING_SV_DIMENSION_COLUMN_NAME,
"json_path_string(column21, 'non-existent-path', null)"),
// Ensure that any transform function failures result in a null value if error on failure is false
new TransformConfig(NEW_WRONG_ARG_DATE_TRUNC_DERIVED_COLUMN_NAME, "dateTrunc('abcd', column1)")
));
_tableConfig.setIngestionConfig(ingestionConfig);
_indexLoadingConfig.addInvertedIndexColumns(NEW_COLUMN_INVERTED_INDEX);
_indexLoadingConfig.addNoDictionaryColumns(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME);
_indexLoadingConfig.setErrorOnColumnBuildFailure(false);
checkUpdateDefaultColumns();

// Try to use the third schema and update default value again.
Expand Down Expand Up @@ -1161,13 +1166,17 @@ public void testV3UpdateDefaultColumns()
ImmutableList.of(
new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)"),
new TransformConfig(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME, "reverse(column3)"),
// Ensure that null values for derived columns are handled appropriately during segment reload
new TransformConfig(NEW_NULLABLE_STRING_SV_DIMENSION_COLUMN_NAME,
"json_path_string(column21, 'non-existent-path', null)")
// Ensure that null values returned by transform functions for derived columns are handled appropriately
// during segment reload
new TransformConfig(NEW_NULL_RETURN_STRING_SV_DIMENSION_COLUMN_NAME,
"json_path_string(column21, 'non-existent-path', null)"),
// Ensure that any transform function failures result in a null value if error on failure is false
new TransformConfig(NEW_WRONG_ARG_DATE_TRUNC_DERIVED_COLUMN_NAME, "dateTrunc('abcd', column1)")
));
_tableConfig.setIngestionConfig(ingestionConfig);
_indexLoadingConfig.addInvertedIndexColumns(NEW_COLUMN_INVERTED_INDEX);
_indexLoadingConfig.addNoDictionaryColumns(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME);
_indexLoadingConfig.setErrorOnColumnBuildFailure(false);
checkUpdateDefaultColumns();

// Try to use the third schema and update default value again.
Expand Down Expand Up @@ -1283,9 +1292,19 @@ private void checkUpdateDefaultColumns()
assertEquals(columnMetadata.getBitsPerElement(), originalColumnMetadata.getBitsPerElement());
assertEquals(columnMetadata.getTotalNumberOfEntries(), originalColumnMetadata.getTotalNumberOfEntries());

columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_NULLABLE_STRING_SV_DIMENSION_COLUMN_NAME);
columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_NULL_RETURN_STRING_SV_DIMENSION_COLUMN_NAME);
// All the values should be the default null value
assertEquals(columnMetadata.getCardinality(), 1);
assertTrue(columnMetadata.isAutoGenerated());
assertEquals(columnMetadata.getMinValue(), "nil");
assertEquals(columnMetadata.getMaxValue(), "nil");

columnMetadata = segmentMetadata.getColumnMetadataFor(NEW_WRONG_ARG_DATE_TRUNC_DERIVED_COLUMN_NAME);
// All the values should be the default null value
assertEquals(columnMetadata.getCardinality(), 1);
assertTrue(columnMetadata.isAutoGenerated());
assertEquals(columnMetadata.getMinValue(), Long.MIN_VALUE);
assertEquals(columnMetadata.getMaxValue(), Long.MIN_VALUE);

// Check dictionary and forward index exist.
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
Expand All @@ -1309,10 +1328,16 @@ private void checkUpdateDefaultColumns()
// Dictionary shouldn't be created for raw derived column
assertFalse(reader.hasIndexFor(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME, StandardIndexes.dictionary()));
assertTrue(reader.hasIndexFor(NEW_RAW_STRING_SV_DIMENSION_COLUMN_NAME, StandardIndexes.forward()));

// Null vector index should be created for derived column with null values
assertTrue(reader.hasIndexFor(NEW_NULLABLE_STRING_SV_DIMENSION_COLUMN_NAME, StandardIndexes.nullValueVector()));
assertTrue(reader.hasIndexFor(NEW_NULLABLE_STRING_SV_DIMENSION_COLUMN_NAME, StandardIndexes.forward()));
assertTrue(reader.hasIndexFor(NEW_NULLABLE_STRING_SV_DIMENSION_COLUMN_NAME, StandardIndexes.dictionary()));
assertTrue(reader.hasIndexFor(NEW_NULL_RETURN_STRING_SV_DIMENSION_COLUMN_NAME,
StandardIndexes.nullValueVector()));
assertTrue(reader.hasIndexFor(NEW_NULL_RETURN_STRING_SV_DIMENSION_COLUMN_NAME, StandardIndexes.forward()));
assertTrue(reader.hasIndexFor(NEW_NULL_RETURN_STRING_SV_DIMENSION_COLUMN_NAME, StandardIndexes.dictionary()));
assertTrue(reader.hasIndexFor(NEW_WRONG_ARG_DATE_TRUNC_DERIVED_COLUMN_NAME,
StandardIndexes.nullValueVector()));
assertTrue(reader.hasIndexFor(NEW_WRONG_ARG_DATE_TRUNC_DERIVED_COLUMN_NAME, StandardIndexes.forward()));
assertTrue(reader.hasIndexFor(NEW_WRONG_ARG_DATE_TRUNC_DERIVED_COLUMN_NAME, StandardIndexes.dictionary()));

assertTrue(reader.hasIndexFor(NEW_INT_METRIC_COLUMN_NAME, StandardIndexes.nullValueVector()));
assertTrue(reader.hasIndexFor(NEW_LONG_METRIC_COLUMN_NAME, StandardIndexes.nullValueVector()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@
},
{
"dataType": "STRING",
"name": "newNullableStringSVDimension",
"defaultNullValue": "null"
"name": "newNullReturnStringSVDimension",
"defaultNullValue": "nil"
},
{
"dataType": "LONG",
"name": "newWrongArgDateTruncDerivedColumn"
}
]
}

0 comments on commit c7a116d

Please sign in to comment.