Skip to content

Commit

Permalink
branch-3.0: [feature](table-function)support posexplode table function
Browse files Browse the repository at this point in the history
…#43221 (#45782)

Cherry-picked from #43221

Co-authored-by: zhangstar333 <[email protected]>
  • Loading branch information
github-actions[bot] and zhangstar333 authored Dec 26, 2024
1 parent 4705087 commit 0cfdf4b
Show file tree
Hide file tree
Showing 10 changed files with 659 additions and 1 deletion.
2 changes: 2 additions & 0 deletions be/src/vec/exprs/table_function/table_function_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "vec/exprs/table_function/vexplode_map.h"
#include "vec/exprs/table_function/vexplode_numbers.h"
#include "vec/exprs/table_function/vexplode_split.h"
#include "vec/exprs/table_function/vposexplode.h"
#include "vec/utils/util.hpp"

namespace doris::vectorized {
Expand Down Expand Up @@ -61,6 +62,7 @@ const std::unordered_map<std::string, std::function<std::unique_ptr<TableFunctio
{"explode_bitmap", TableFunctionCreator<VExplodeBitmapTableFunction>()},
{"explode_map", TableFunctionCreator<VExplodeMapTableFunction> {}},
{"explode_json_object", TableFunctionCreator<VExplodeJsonObjectTableFunction> {}},
{"posexplode", TableFunctionCreator<VPosExplodeTableFunction> {}},
{"explode", TableFunctionCreator<VExplodeTableFunction> {}}};

Status TableFunctionFactory::get_fn(const TFunction& t_fn, ObjectPool* pool, TableFunction** fn) {
Expand Down
155 changes: 155 additions & 0 deletions be/src/vec/exprs/table_function/vposexplode.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "vec/exprs/table_function/vposexplode.h"

#include <glog/logging.h>

#include <ostream>
#include <vector>

#include "common/status.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

namespace doris::vectorized {

VPosExplodeTableFunction::VPosExplodeTableFunction() {
_fn_name = "posexplode";
}

Status VPosExplodeTableFunction::process_init(Block* block, RuntimeState* state) {
CHECK(_expr_context->root()->children().size() == 1)
<< "VPosExplodeTableFunction only support 1 child but has "
<< _expr_context->root()->children().size();

int value_column_idx = -1;
RETURN_IF_ERROR(_expr_context->root()->children()[0]->execute(_expr_context.get(), block,
&value_column_idx));

_collection_column =
block->get_by_position(value_column_idx).column->convert_to_full_column_if_const();

if (!extract_column_array_info(*_collection_column, _array_detail)) {
return Status::NotSupported("column type {} not supported now, only support array",
block->get_by_position(value_column_idx).column->get_name());
}
if (is_column_nullable(*_collection_column)) {
_array_data_column =
assert_cast<const ColumnArray&>(
assert_cast<const ColumnNullable&>(*_collection_column).get_nested_column())
.get_data_ptr();
} else {
_array_data_column = assert_cast<const ColumnArray&>(*_collection_column).get_data_ptr();
}
return Status::OK();
}

void VPosExplodeTableFunction::process_row(size_t row_idx) {
DCHECK(row_idx < _collection_column->size());
TableFunction::process_row(row_idx);

if (!_array_detail.array_nullmap_data || !_array_detail.array_nullmap_data[row_idx]) {
_collection_offset = (*_array_detail.offsets_ptr)[row_idx - 1];
_cur_size = (*_array_detail.offsets_ptr)[row_idx] - _collection_offset;
}
}

void VPosExplodeTableFunction::process_close() {
_collection_column = nullptr;
_array_data_column = nullptr;
_array_detail.reset();
_collection_offset = 0;
}

void VPosExplodeTableFunction::get_same_many_values(MutableColumnPtr& column, int length) {
// now we only support array column explode to struct column
size_t pos = _collection_offset + _cur_offset;
// if current is empty array row, also append a default value
if (current_empty()) {
column->insert_many_defaults(length);
return;
}
ColumnStruct* ret = nullptr;
// this _is_nullable is whole output column's nullable
if (_is_nullable) {
ret = assert_cast<ColumnStruct*>(
assert_cast<ColumnNullable*>(column.get())->get_nested_column_ptr().get());
assert_cast<ColumnUInt8*>(
assert_cast<ColumnNullable*>(column.get())->get_null_map_column_ptr().get())
->insert_many_defaults(length);
} else if (column->is_column_struct()) {
ret = assert_cast<ColumnStruct*>(column.get());
} else {
throw Exception(ErrorCode::INTERNAL_ERROR,
"only support array column explode to struct column");
}
if (!ret || ret->tuple_size() != 2) {
throw Exception(
ErrorCode::INTERNAL_ERROR,
"only support array column explode to two column, but given: ", ret->tuple_size());
}
auto& pose_column_nullable = assert_cast<ColumnNullable&>(ret->get_column(0));
pose_column_nullable.get_null_map_column().insert_many_defaults(length);
assert_cast<ColumnInt32&>(pose_column_nullable.get_nested_column())
.insert_many_vals(_cur_offset, length);
ret->get_column(1).insert_many_from(*_array_data_column, pos, length);
}

int VPosExplodeTableFunction::get_value(MutableColumnPtr& column, int max_step) {
max_step = std::min(max_step, (int)(_cur_size - _cur_offset));
size_t pos = _collection_offset + _cur_offset;
if (current_empty()) {
column->insert_default();
max_step = 1;
} else {
ColumnStruct* struct_column = nullptr;
if (_is_nullable) {
auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
struct_column =
assert_cast<ColumnStruct*>(nullable_column->get_nested_column_ptr().get());
auto* nullmap_column =
assert_cast<ColumnUInt8*>(nullable_column->get_null_map_column_ptr().get());
// here nullmap_column insert max_step many defaults as if array[row_idx] is NULL
// will be not update value, _cur_size = 0, means current_empty;
// so here could insert directly
nullmap_column->insert_many_defaults(max_step);
} else {
struct_column = assert_cast<ColumnStruct*>(column.get());
}
if (!struct_column || struct_column->tuple_size() != 2) {
throw Exception(ErrorCode::INTERNAL_ERROR,
"only support array column explode to two column, but given: ",
struct_column->tuple_size());
}
auto& pose_column_nullable = assert_cast<ColumnNullable&>(struct_column->get_column(0));
pose_column_nullable.get_null_map_column().insert_many_defaults(max_step);
assert_cast<ColumnInt32&>(pose_column_nullable.get_nested_column())
.insert_range_of_integer(_cur_offset, _cur_offset + max_step);
struct_column->get_column(1).insert_range_from(*_array_data_column, pos, max_step);
}
forward(max_step);
return max_step;
}
} // namespace doris::vectorized
50 changes: 50 additions & 0 deletions be/src/vec/exprs/table_function/vposexplode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "common/status.h"
#include "vec/columns/column_map.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_array.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/functions/array/function_array_utils.h"

namespace doris::vectorized {

class VPosExplodeTableFunction : public TableFunction {
ENABLE_FACTORY_CREATOR(VPosExplodeTableFunction);

public:
VPosExplodeTableFunction();

~VPosExplodeTableFunction() override = default;

Status process_init(Block* block, RuntimeState* state) override;
void process_row(size_t row_idx) override;
void process_close() override;
void get_same_many_values(MutableColumnPtr& column, int length) override;
int get_value(MutableColumnPtr& column, int max_step) override;

private:
ColumnPtr _collection_column;
ColumnPtr _array_data_column;
ColumnArrayExecutionData _array_detail;
size_t _collection_offset; // start offset of array[row_idx]
};

} // namespace doris::vectorized
29 changes: 29 additions & 0 deletions be/src/vec/functions/function_fake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <memory>
#include <ostream>
#include <string>

Expand Down Expand Up @@ -83,6 +84,25 @@ struct FunctionExplodeMap {
static std::string get_error_msg() { return "Fake function do not support execute"; }
};

template <bool AlwaysNullable = false>
struct FunctionPoseExplode {
static DataTypePtr get_return_type_impl(const DataTypes& arguments) {
DCHECK(is_array(arguments[0])) << arguments[0]->get_name() << " not supported";
DataTypes fieldTypes(2);
fieldTypes[0] = make_nullable(std::make_shared<DataTypeInt32>());
fieldTypes[1] =
check_and_get_data_type<DataTypeArray>(arguments[0].get())->get_nested_type();
auto struct_type = std::make_shared<vectorized::DataTypeStruct>(fieldTypes);
if constexpr (AlwaysNullable) {
return make_nullable(struct_type);
} else {
return arguments[0]->is_nullable() ? make_nullable(struct_type) : struct_type;
}
}
static DataTypes get_variadic_argument_types() { return {}; }
static std::string get_error_msg() { return "Fake function do not support execute"; }
};

// explode json-object: expands json-object to struct with a pair of key and value in column string
struct FunctionExplodeJsonObject {
static DataTypePtr get_return_type_impl(const DataTypes& arguments) {
Expand Down Expand Up @@ -138,6 +158,12 @@ void register_table_function_expand_outer_default(SimpleFunctionFactory& factory
COMBINATOR_SUFFIX_OUTER);
};

template <typename FunctionImpl>
void register_table_function_with_impl(SimpleFunctionFactory& factory, const std::string& name,
const std::string& suffix = "") {
factory.register_function<FunctionFake<FunctionImpl>>(name + suffix);
};

void register_function_fake(SimpleFunctionFactory& factory) {
register_function<FunctionEsquery>(factory, "esquery");

Expand All @@ -158,6 +184,9 @@ void register_function_fake(SimpleFunctionFactory& factory) {
register_table_function_expand_outer_default<DataTypeFloat64, false>(
factory, "explode_json_array_double");
register_table_function_expand_outer_default<DataTypeInt64, false>(factory, "explode_bitmap");
register_table_function_with_impl<FunctionPoseExplode<false>>(factory, "posexplode");
register_table_function_with_impl<FunctionPoseExplode<true>>(factory, "posexplode",
COMBINATOR_SUFFIX_OUTER);
register_table_function_expand_outer_default<DataTypeObject, false>(factory,
"explode_variant_array");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.doris.nereids.trees.expressions.functions.generator.ExplodeSplit;
import org.apache.doris.nereids.trees.expressions.functions.generator.ExplodeSplitOuter;
import org.apache.doris.nereids.trees.expressions.functions.generator.ExplodeVariantArray;
import org.apache.doris.nereids.trees.expressions.functions.generator.PosExplode;
import org.apache.doris.nereids.trees.expressions.functions.generator.PosExplodeOuter;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -71,7 +73,9 @@ public class BuiltinTableGeneratingFunctions implements FunctionHelper {
tableGenerating(ExplodeJsonArrayStringOuter.class, "explode_json_array_string_outer"),
tableGenerating(ExplodeJsonArrayJson.class, "explode_json_array_json"),
tableGenerating(ExplodeJsonArrayJsonOuter.class, "explode_json_array_json_outer"),
tableGenerating(ExplodeVariantArray.class, "explode_variant_array")
tableGenerating(ExplodeVariantArray.class, "explode_variant_array"),
tableGenerating(PosExplode.class, "posexplode"),
tableGenerating(PosExplodeOuter.class, "posexplode_outer")
);

public static final BuiltinTableGeneratingFunctions INSTANCE = new BuiltinTableGeneratingFunctions();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.expressions.functions.generator;

import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.PropagateNullable;
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.ArrayType;
import org.apache.doris.nereids.types.IntegerType;
import org.apache.doris.nereids.types.StructField;
import org.apache.doris.nereids.types.StructType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import java.util.List;

/**
* PosExplode(array('a','b','c')) generate two columns and three rows with:
* pose column: 0, 1, 2
* value column: 'a', 'b', 'c'
*/
public class PosExplode extends TableGeneratingFunction implements UnaryExpression, PropagateNullable {

/**
* constructor with 1 argument.
*/
public PosExplode(Expression arg) {
super("posexplode", arg);
}

/**
* withChildren.
*/
@Override
public PosExplode withChildren(List<Expression> children) {
Preconditions.checkArgument(children.size() == 1);
return new PosExplode(children.get(0));
}

@Override
public void checkLegalityBeforeTypeCoercion() {
if (!(child().getDataType() instanceof ArrayType)) {
throw new AnalysisException("only support array type for posexplode function but got "
+ child().getDataType());
}
}

@Override
public List<FunctionSignature> getSignatures() {
return ImmutableList.of(
FunctionSignature.ret(new StructType(ImmutableList.of(
new StructField("pos", IntegerType.INSTANCE, true, ""),
new StructField("col", ((ArrayType) child().getDataType()).getItemType(), true, ""))))
.args(child().getDataType()));
}

@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitPosExplode(this, context);
}
}
Loading

0 comments on commit 0cfdf4b

Please sign in to comment.