From c2543d4e51bcf800ea093f143f7b8f48a479a418 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 26 Nov 2024 14:53:01 -0800 Subject: [PATCH 01/11] Add aggregation utils Signed-off-by: Nghia Truong --- src/main/cpp/CMakeLists.txt | 2 + src/main/cpp/src/AggregationUtilsJni.cpp | 38 ++ src/main/cpp/src/aggregation_utils.cu | 427 ++++++++++++++++++ src/main/cpp/src/aggregation_utils.hpp | 29 ++ .../spark/rapids/jni/AggregationUtils.java | 43 ++ 5 files changed, 539 insertions(+) create mode 100644 src/main/cpp/src/AggregationUtilsJni.cpp create mode 100644 src/main/cpp/src/aggregation_utils.cu create mode 100644 src/main/cpp/src/aggregation_utils.hpp create mode 100644 src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt index 6447e2707..44a5ffc11 100644 --- a/src/main/cpp/CMakeLists.txt +++ b/src/main/cpp/CMakeLists.txt @@ -187,6 +187,7 @@ set(CUDFJNI_INCLUDE_DIRS add_library( spark_rapids_jni SHARED + src/AggregationUtilsJni.cpp src/BloomFilterJni.cpp src/CaseWhenJni.cpp src/CastStringJni.cpp @@ -204,6 +205,7 @@ add_library( src/SparkResourceAdaptorJni.cpp src/SubStringIndexJni.cpp src/ZOrderJni.cpp + src/aggregation_utils.cu src/bloom_filter.cu src/case_when.cu src/cast_decimal_to_string.cu diff --git a/src/main/cpp/src/AggregationUtilsJni.cpp b/src/main/cpp/src/AggregationUtilsJni.cpp new file mode 100644 index 000000000..c1999faf1 --- /dev/null +++ b/src/main/cpp/src/AggregationUtilsJni.cpp @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "aggregation_utils.hpp" +#include "cudf_jni_apis.hpp" + +extern "C" { + +JNIEXPORT jlongArray JNICALL +Java_com_nvidia_spark_rapids_jni_JSONUtils_createNativeTestHostUDF(JNIEnv* env, jclass) +{ + try { + cudf::jni::auto_set_device(env); + auto udf_ptr = spark_rapids_jni::create_test_groupby_host_udf(); + // The first value is pointer to host_udf instance, + // and the second value is its hash code. + auto out_handles = cudf::jni::native_jlongArray(env, 2); + out_handles[1] = static_cast(udf_ptr->do_hash()); + out_handles[0] = reinterpret_cast(udf_ptr.release()); + return out_handles.get_jArray(); + } + CATCH_STD(env, 0); +} + +} // extern "C" diff --git a/src/main/cpp/src/aggregation_utils.cu b/src/main/cpp/src/aggregation_utils.cu new file mode 100644 index 000000000..0a6e2d802 --- /dev/null +++ b/src/main/cpp/src/aggregation_utils.cu @@ -0,0 +1,427 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "aggregation_utils.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace spark_rapids_jni { + +namespace { +/** + * @brief A host-based UDF implementation. + * + * The aggregations perform the following computation: + * - For reduction: compute `sum(value^2, for value in group)` (this is sum of squared). + * - For segmented reduction: compute `segment_size * sum(value^2, for value in group)`. + * - For groupby: compute `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. + * + * In addition, for segmented reduction, if null_policy is set to `INCLUDE`, the null values are + * replaced with an initial value if it is provided. + */ +template +struct test_udf_simple_type : cudf::host_udf_base { + static_assert(std::is_same_v || + std::is_same_v || + std::is_same_v); + + test_udf_simple_type() = default; + + [[nodiscard]] input_data_attributes get_required_data() const override + { + if constexpr (std::is_same_v || + std::is_same_v) { + // Empty set, which means we need everything. + return {}; + } else { + return {groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS, + cudf::make_max_aggregation(), + cudf::make_sum_aggregation()}; + } + } + + [[nodiscard]] output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + if constexpr (std::is_same_v) { + auto const& values = + std::get(input.at(reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(reduction_data_attribute::OUTPUT_DTYPE)); + return cudf::double_type_dispatcher( + values.type(), output_dtype, reduce_fn{this}, input, stream, mr); + } else if constexpr (std::is_same_v) { + auto const& values = + std::get(input.at(segmented_reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(segmented_reduction_data_attribute::OUTPUT_DTYPE)); + return cudf::double_type_dispatcher( + values.type(), output_dtype, segmented_reduce_fn{this}, input, stream, mr); + } else { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + return cudf::type_dispatcher(values.type(), groupby_fn{this}, input, stream, mr); + } + } + + [[nodiscard]] output_type get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + if constexpr (std::is_same_v) { + CUDF_EXPECTS(output_dtype.has_value(), + "Data type for the reduction result must be specified."); + return cudf::make_default_constructed_scalar(output_dtype.value(), stream, mr); + } else if constexpr (std::is_same_v) { + CUDF_EXPECTS(output_dtype.has_value(), + "Data type for the reduction result must be specified."); + return cudf::make_empty_column(output_dtype.value()); + } else { + return cudf::make_empty_column( + cudf::data_type{cudf::type_to_id()}); + } + } + + [[nodiscard]] bool is_equal(host_udf_base const& other) const override + { + // Just check if the other object is also instance of the same derived class. + return dynamic_cast(&other) != nullptr; + } + + [[nodiscard]] std::size_t do_hash() const override + { + return std::hash{}({"test_udf_simple_type"}); + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(); + } + + // For faster compile times, we only support a few input/output types. + template + static constexpr bool is_valid_input_t() + { + return std::is_same_v || std::is_same_v; + } + + // For faster compile times, we only support a few input/output types. + template + static constexpr bool is_valid_output_t() + { + return std::is_same_v; + } + + struct reduce_fn { + // Store pointer to the parent class so we can call its functions. + test_udf_simple_type const* parent; + + template () || !is_valid_output_t())> + output_type operator()(Args...) const + { + CUDF_FAIL("Unsupported input type."); + } + + template () && is_valid_output_t())> + output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + auto const& values = + std::get(input.at(reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(reduction_data_attribute::OUTPUT_DTYPE)); + auto const input_init_value = + std::get>>( + input.at(reduction_data_attribute::INIT_VALUE)); + + if (values.size() == 0) { return parent->get_empty_output(output_dtype, stream, mr); } + + auto const init_value = [&]() -> InputType { + if (input_init_value.has_value() && input_init_value.value().get().is_valid(stream)) { + auto const numeric_init_scalar = + dynamic_cast const*>(&input_init_value.value().get()); + CUDF_EXPECTS(numeric_init_scalar != nullptr, "Invalid init scalar for reduction."); + return numeric_init_scalar->value(stream); + } + return InputType{0}; + }(); + + auto const values_dv_ptr = cudf::column_device_view::create(values, stream); + auto const result = + thrust::transform_reduce(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(values.size()), + transform_fn{*values_dv_ptr}, + static_cast(init_value), + thrust::plus<>{}); + + auto output = cudf::make_numeric_scalar(output_dtype, stream, mr); + static_cast*>(output.get())->set_value(result, stream); + return output; + } + + template + struct transform_fn { + cudf::column_device_view values; + OutputType __device__ operator()(cudf::size_type idx) const + { + if (values.is_null(idx)) { return OutputType{0}; } + auto const val = static_cast(values.element(idx)); + return val * val; + } + }; + }; + + struct segmented_reduce_fn { + // Store pointer to the parent class so we can call its functions. + test_udf_simple_type const* parent; + + template () || !is_valid_output_t())> + output_type operator()(Args...) const + { + CUDF_FAIL("Unsupported input type."); + } + + template () && is_valid_output_t())> + output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + auto const& values = + std::get(input.at(segmented_reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(segmented_reduction_data_attribute::OUTPUT_DTYPE)); + auto const offsets = std::get>( + input.at(segmented_reduction_data_attribute::OFFSETS)); + CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); + auto const num_segments = static_cast(offsets.size()) - 1; + + if (values.size() == 0) { + if (num_segments <= 0) { + return parent->get_empty_output(output_dtype, stream, mr); + } else { + return cudf::make_numeric_column( + output_dtype, num_segments, cudf::mask_state::ALL_NULL, stream, mr); + } + } + + auto const input_init_value = + std::get>>( + input.at(segmented_reduction_data_attribute::INIT_VALUE)); + + auto const init_value = [&]() -> InputType { + if (input_init_value.has_value() && input_init_value.value().get().is_valid(stream)) { + auto const numeric_init_scalar = + dynamic_cast const*>(&input_init_value.value().get()); + CUDF_EXPECTS(numeric_init_scalar != nullptr, "Invalid init scalar for reduction."); + return numeric_init_scalar->value(stream); + } + return InputType{0}; + }(); + + auto const null_handling = + std::get(input.at(segmented_reduction_data_attribute::NULL_POLICY)); + + auto const values_dv_ptr = cudf::column_device_view::create(values, stream); + auto output = cudf::make_numeric_column( + output_dtype, num_segments, cudf::mask_state::UNALLOCATED, stream); + rmm::device_uvector validity(num_segments, stream); + + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_segments), + thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), + transform_fn{ + *values_dv_ptr, offsets, static_cast(init_value), null_handling}); + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); + if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } + return output; + } + + template + struct transform_fn { + cudf::column_device_view values; + cudf::device_span offsets; + OutputType init_value; + cudf::null_policy null_handling; + + thrust::tuple __device__ operator()(cudf::size_type idx) const + { + auto const start = offsets[idx]; + auto const end = offsets[idx + 1]; + if (start == end) { return {OutputType{0}, false}; } + + auto sum = init_value; + for (auto i = start; i < end; ++i) { + if (values.is_null(i)) { + if (null_handling == cudf::null_policy::INCLUDE) { sum += init_value * init_value; } + continue; + } + auto const val = static_cast(values.element(i)); + sum += val * val; + } + auto const segment_size = end - start; + return {static_cast(segment_size) * sum, true}; + } + }; + }; + + struct groupby_fn { + // Store pointer to the parent class so we can call its functions. + test_udf_simple_type const* parent; + using OutputType = double; + template + using MaxType = cudf::detail::target_type_t; + template + using SumType = cudf::detail::target_type_t; + + template ())> + output_type operator()(Args...) const + { + CUDF_FAIL("Unsupported input type."); + } + + template ())> + output_type operator()(host_udf_input const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const + { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + if (values.size() == 0) { return parent->get_empty_output(std::nullopt, stream, mr); } + + auto const offsets = std::get>( + input.at(groupby_data_attribute::GROUP_OFFSETS)); + CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); + auto const num_groups = static_cast(offsets.size()) - 1; + auto const group_indices = std::get>( + input.at(groupby_data_attribute::GROUP_LABELS)); + auto const group_max = std::get( + input.at(cudf::make_max_aggregation())); + auto const group_sum = std::get( + input.at(cudf::make_sum_aggregation())); + + auto const values_dv_ptr = cudf::column_device_view::create(values, stream); + auto output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, + num_groups, + cudf::mask_state::UNALLOCATED, + stream); + rmm::device_uvector validity(num_groups, stream); + + thrust::transform( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_groups), + thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), + transform_fn{*values_dv_ptr, + offsets, + group_indices, + group_max.begin>(), + group_sum.begin>()}); + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); + if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } + return output; + } + + template + struct transform_fn { + cudf::column_device_view values; + cudf::device_span offsets; + cudf::device_span group_indices; + MaxType const* group_max; + SumType const* group_sum; + + thrust::tuple __device__ operator()(cudf::size_type idx) const + { + auto const start = offsets[idx]; + auto const end = offsets[idx + 1]; + if (start == end) { return {OutputType{0}, false}; } + + auto sum_sqr = OutputType{0}; + bool has_valid{false}; + for (auto i = start; i < end; ++i) { + if (values.is_null(i)) { continue; } + has_valid = true; + auto const val = static_cast(values.element(i)); + sum_sqr += val * val; + } + + if (!has_valid) { return {OutputType{0}, false}; } + return {static_cast(group_indices[start] + 1) * sum_sqr - + static_cast(group_max[idx]) * static_cast(group_sum[idx]), + true}; + } + }; + }; +}; + +} // namespace + +std::unique_ptr create_test_reduction_host_udf() +{ + // This only use the groupby code. + // Reduction and segmented reduction code is unused. + return std::make_unique>(); +} + +std::unique_ptr create_test_segmented_reduction_host_udf() +{ + // This only use the groupby code. + // Reduction and segmented reduction code is unused. + return std::make_unique>(); +} + +std::unique_ptr create_test_groupby_host_udf() +{ + // This only use the groupby code. + // Reduction and segmented reduction code is unused. + return std::make_unique>(); +} + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/aggregation_utils.hpp b/src/main/cpp/src/aggregation_utils.hpp new file mode 100644 index 000000000..940fe4e63 --- /dev/null +++ b/src/main/cpp/src/aggregation_utils.hpp @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace spark_rapids_jni { + +std::unique_ptr create_test_reduction_host_udf(); + +std::unique_ptr create_test_segmented_reduction_host_udf(); + +std::unique_ptr create_test_groupby_host_udf(); + +} // namespace spark_rapids_jni diff --git a/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java b/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java new file mode 100644 index 000000000..4a3fb655c --- /dev/null +++ b/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.jni; + +import ai.rapids.cudf.NativeDepsLoader; + +public class AggregationUtils { + static { + NativeDepsLoader.loadNativeDeps(); + } + + /** + * Create a test host UDF for testing purposes. + * + * This will return two values: the first is the pointer to the host UDF, and the second is the + * hash code of the host UDF. + * + * To create a host UDF aggregation, do this: + * ``` + * long[] udfAndHash = AggregationUtils.createTestHostUDF(); + * new ai.rapids.cudf.HostUDFAggregation(udfAndHash[0], udfAndHash[1]); + * ``` + */ + public static long[] createTestHostUDF() { + return createNativeTestHostUDF(); + } + + private static native long[] createNativeTestHostUDF(); +} From 4d6be69384520ec5975fee0c663c291f5c0d1f4b Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 26 Nov 2024 20:29:00 -0800 Subject: [PATCH 02/11] Change Java function Signed-off-by: Nghia Truong --- src/main/cpp/CMakeLists.txt | 2 +- src/main/cpp/src/AggregationUtilsJni.cpp | 14 +++++++++++--- ...aggregation_utils.cu => test_host_udf_agg.cu} | 0 .../spark/rapids/jni/AggregationUtils.java | 16 +++++++++++++--- 4 files changed, 25 insertions(+), 7 deletions(-) rename src/main/cpp/src/{aggregation_utils.cu => test_host_udf_agg.cu} (100%) diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt index 44a5ffc11..c3548cb12 100644 --- a/src/main/cpp/CMakeLists.txt +++ b/src/main/cpp/CMakeLists.txt @@ -205,7 +205,6 @@ add_library( src/SparkResourceAdaptorJni.cpp src/SubStringIndexJni.cpp src/ZOrderJni.cpp - src/aggregation_utils.cu src/bloom_filter.cu src/case_when.cu src/cast_decimal_to_string.cu @@ -225,6 +224,7 @@ add_library( src/regex_rewrite_utils.cu src/row_conversion.cu src/substring_index.cu + src/test_host_udf_agg.cu src/timezones.cu src/utilities.cu src/xxhash64.cu diff --git a/src/main/cpp/src/AggregationUtilsJni.cpp b/src/main/cpp/src/AggregationUtilsJni.cpp index c1999faf1..ebbd03f2a 100644 --- a/src/main/cpp/src/AggregationUtilsJni.cpp +++ b/src/main/cpp/src/AggregationUtilsJni.cpp @@ -19,12 +19,20 @@ extern "C" { -JNIEXPORT jlongArray JNICALL -Java_com_nvidia_spark_rapids_jni_JSONUtils_createNativeTestHostUDF(JNIEnv* env, jclass) +JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_createNativeTestHostUDF( + JNIEnv* env, jclass, jint agg_type) { try { cudf::jni::auto_set_device(env); - auto udf_ptr = spark_rapids_jni::create_test_groupby_host_udf(); + auto udf_ptr = [&] { + // The value of agg_type must be in sync with `AggregationUtils.java#AggregationType`. + switch (agg_type) { + case 0: return spark_rapids_jni::create_test_reduction_host_udf(); + case 1: return spark_rapids_jni::create_test_segmented_reduction_host_udf(); + case 2: return spark_rapids_jni::create_test_groupby_host_udf(); + default:; + } + }(); // The first value is pointer to host_udf instance, // and the second value is its hash code. auto out_handles = cudf::jni::native_jlongArray(env, 2); diff --git a/src/main/cpp/src/aggregation_utils.cu b/src/main/cpp/src/test_host_udf_agg.cu similarity index 100% rename from src/main/cpp/src/aggregation_utils.cu rename to src/main/cpp/src/test_host_udf_agg.cu diff --git a/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java b/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java index 4a3fb655c..d1048edd3 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java @@ -23,6 +23,16 @@ public class AggregationUtils { NativeDepsLoader.loadNativeDeps(); } + enum AggregationType { + Reduction(0), + SegmentedReduction(1), + GroupByAggregation(2); + + final int nativeId; + + AggregationType(int nativeId) {this.nativeId = nativeId;} + } + /** * Create a test host UDF for testing purposes. * @@ -35,9 +45,9 @@ public class AggregationUtils { * new ai.rapids.cudf.HostUDFAggregation(udfAndHash[0], udfAndHash[1]); * ``` */ - public static long[] createTestHostUDF() { - return createNativeTestHostUDF(); + public static long[] createTestHostUDF(AggregationType type) { + return createNativeTestHostUDF(type.nativeId); } - private static native long[] createNativeTestHostUDF(); + private static native long[] createNativeTestHostUDF(int type); } From aad7900d43ae5762dbc4401fb1bad1a38d2d58a1 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 26 Nov 2024 21:55:02 -0800 Subject: [PATCH 03/11] Change interface Signed-off-by: Nghia Truong --- src/main/cpp/src/AggregationUtilsJni.cpp | 14 +++++--------- src/main/cpp/src/test_host_udf_agg.cu | 2 +- .../nvidia/spark/rapids/jni/AggregationUtils.java | 10 +++++----- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/main/cpp/src/AggregationUtilsJni.cpp b/src/main/cpp/src/AggregationUtilsJni.cpp index ebbd03f2a..2e98b9900 100644 --- a/src/main/cpp/src/AggregationUtilsJni.cpp +++ b/src/main/cpp/src/AggregationUtilsJni.cpp @@ -19,7 +19,7 @@ extern "C" { -JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_createNativeTestHostUDF( +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_AggregationUtils_createNativeTestHostUDF( JNIEnv* env, jclass, jint agg_type) { try { @@ -29,16 +29,12 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_createNa switch (agg_type) { case 0: return spark_rapids_jni::create_test_reduction_host_udf(); case 1: return spark_rapids_jni::create_test_segmented_reduction_host_udf(); - case 2: return spark_rapids_jni::create_test_groupby_host_udf(); - default:; + default: return spark_rapids_jni::create_test_groupby_host_udf(); } }(); - // The first value is pointer to host_udf instance, - // and the second value is its hash code. - auto out_handles = cudf::jni::native_jlongArray(env, 2); - out_handles[1] = static_cast(udf_ptr->do_hash()); - out_handles[0] = reinterpret_cast(udf_ptr.release()); - return out_handles.get_jArray(); + CUDF_EXPECTS(udf_ptr != nullptr, "Invalid host udf instance."); + + return reinterpret_cast(udf_ptr.release()); } CATCH_STD(env, 0); } diff --git a/src/main/cpp/src/test_host_udf_agg.cu b/src/main/cpp/src/test_host_udf_agg.cu index 0a6e2d802..d501c2aa7 100644 --- a/src/main/cpp/src/test_host_udf_agg.cu +++ b/src/main/cpp/src/test_host_udf_agg.cu @@ -141,7 +141,7 @@ struct test_udf_simple_type : cudf::host_udf_base { template static constexpr bool is_valid_output_t() { - return std::is_same_v; + return std::is_same_v || std::is_same_v; } struct reduce_fn { diff --git a/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java b/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java index d1048edd3..9fd352b2f 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java @@ -23,7 +23,7 @@ public class AggregationUtils { NativeDepsLoader.loadNativeDeps(); } - enum AggregationType { + public enum AggregationType { Reduction(0), SegmentedReduction(1), GroupByAggregation(2); @@ -35,19 +35,19 @@ enum AggregationType { /** * Create a test host UDF for testing purposes. - * + *

* This will return two values: the first is the pointer to the host UDF, and the second is the * hash code of the host UDF. - * + *

* To create a host UDF aggregation, do this: * ``` * long[] udfAndHash = AggregationUtils.createTestHostUDF(); * new ai.rapids.cudf.HostUDFAggregation(udfAndHash[0], udfAndHash[1]); * ``` */ - public static long[] createTestHostUDF(AggregationType type) { + public static long createTestHostUDF(AggregationType type) { return createNativeTestHostUDF(type.nativeId); } - private static native long[] createNativeTestHostUDF(int type); + private static native long createNativeTestHostUDF(int type); } From a63179e21a47c431a46426e12f5705f5f2f5c76e Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 26 Nov 2024 22:28:10 -0800 Subject: [PATCH 04/11] Add debug code Signed-off-by: Nghia Truong --- src/main/cpp/src/test_host_udf_agg.cu | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/main/cpp/src/test_host_udf_agg.cu b/src/main/cpp/src/test_host_udf_agg.cu index d501c2aa7..1ba05f47b 100644 --- a/src/main/cpp/src/test_host_udf_agg.cu +++ b/src/main/cpp/src/test_host_udf_agg.cu @@ -193,6 +193,21 @@ struct test_udf_simple_type : cudf::host_udf_base { static_cast(init_value), thrust::plus<>{}); + if constexpr (std::is_same_v && std::is_same_v) { + std::vector in(values.size()); + CUDF_CUDA_TRY(cudaMemcpyAsync(in.data(), + values.begin(), + values.size() * sizeof(double), + cudaMemcpyDefault, + stream.value())); + stream.synchronize(); + std::cout << "input: " << std::endl; + for (auto x : in) { + std::cout << x << ", "; + } + std::cout << "\nresult: " << result << std::endl; + } + auto output = cudf::make_numeric_scalar(output_dtype, stream, mr); static_cast*>(output.get())->set_value(result, stream); return output; From 19e7507013d78c016d1bd832d5ca0fa38d31c337 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 20:06:43 -0800 Subject: [PATCH 05/11] Change Java class Signed-off-by: Nghia Truong --- ...AggregationUtils.java => TestHostUDF.java} | 23 ++++++++++++++++--- thirdparty/cudf | 2 +- 2 files changed, 21 insertions(+), 4 deletions(-) rename src/main/java/com/nvidia/spark/rapids/jni/{AggregationUtils.java => TestHostUDF.java} (70%) diff --git a/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java b/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java similarity index 70% rename from src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java rename to src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java index 9fd352b2f..6b729ea0c 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/AggregationUtils.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java @@ -17,12 +17,29 @@ package com.nvidia.spark.rapids.jni; import ai.rapids.cudf.NativeDepsLoader; +import ai.rapids.cudf.Aggregation; -public class AggregationUtils { +// A new host UDF implementation must extend Aggregation.HostUDFWrapper, +// and override the hashCode and equals methods. +public class TestHostUDF extends Aggregation.HostUDFWrapper { static { NativeDepsLoader.loadNativeDeps(); } + TestHostUDF(long udfNativeHandle) { + super(udfNativeHandle); + } + + @Override + public int hashCode() { + return 12345; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestHostUDF; + } + public enum AggregationType { Reduction(0), SegmentedReduction(1), @@ -45,8 +62,8 @@ public enum AggregationType { * new ai.rapids.cudf.HostUDFAggregation(udfAndHash[0], udfAndHash[1]); * ``` */ - public static long createTestHostUDF(AggregationType type) { - return createNativeTestHostUDF(type.nativeId); + public static TestHostUDF createTestHostUDF(AggregationType type) { + return new TestHostUDF(createNativeTestHostUDF(type.nativeId)); } private static native long createNativeTestHostUDF(int type); diff --git a/thirdparty/cudf b/thirdparty/cudf index fa62ff45e..05084a401 160000 --- a/thirdparty/cudf +++ b/thirdparty/cudf @@ -1 +1 @@ -Subproject commit fa62ff45eddd8256f0a3e8cebf077970dd70cb67 +Subproject commit 05084a4019e32d8b9a323e7e77f7f5b7f231ffee From 2d81d60989fed958fa11df0c84dea9b0283318c7 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 20:40:47 -0800 Subject: [PATCH 06/11] Fix error --- src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java | 4 ++-- thirdparty/cudf | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java b/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java index 6b729ea0c..9dce61187 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java @@ -17,11 +17,11 @@ package com.nvidia.spark.rapids.jni; import ai.rapids.cudf.NativeDepsLoader; -import ai.rapids.cudf.Aggregation; +import ai.rapids.cudf.HostUDFWrapper; // A new host UDF implementation must extend Aggregation.HostUDFWrapper, // and override the hashCode and equals methods. -public class TestHostUDF extends Aggregation.HostUDFWrapper { +public class TestHostUDF extends HostUDFWrapper { static { NativeDepsLoader.loadNativeDeps(); } diff --git a/thirdparty/cudf b/thirdparty/cudf index 05084a401..9e0f99641 160000 --- a/thirdparty/cudf +++ b/thirdparty/cudf @@ -1 +1 @@ -Subproject commit 05084a4019e32d8b9a323e7e77f7f5b7f231ffee +Subproject commit 9e0f996415ba9a8c8afc3781b6cfe7f068e23aa0 From 456d818c3d8295ffd3b99c5a210d4f26343c1c6d Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 21:08:50 -0800 Subject: [PATCH 07/11] Update cudf Signed-off-by: Nghia Truong --- thirdparty/cudf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/cudf b/thirdparty/cudf index 9e0f99641..8bc63437c 160000 --- a/thirdparty/cudf +++ b/thirdparty/cudf @@ -1 +1 @@ -Subproject commit 9e0f996415ba9a8c8afc3781b6cfe7f068e23aa0 +Subproject commit 8bc63437cfe0725fe4ea56c232d74764cb6d2dd6 From 4ea0c6ca71becbe582d90a0e37f7d09e1fa7ec31 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Wed, 18 Dec 2024 13:38:52 -0800 Subject: [PATCH 08/11] Update cudf Signed-off-by: Nghia Truong --- thirdparty/cudf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/cudf b/thirdparty/cudf index 8bc63437c..e51fc98e8 160000 --- a/thirdparty/cudf +++ b/thirdparty/cudf @@ -1 +1 @@ -Subproject commit 8bc63437cfe0725fe4ea56c232d74764cb6d2dd6 +Subproject commit e51fc98e852759afaa8b6ae0483dbe6db2d00dc6 From 605499c69990902efebdc443405e81b24d14ecc1 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 20 Dec 2024 11:30:28 -0800 Subject: [PATCH 09/11] Update cudf Signed-off-by: Nghia Truong --- thirdparty/cudf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/cudf b/thirdparty/cudf index e51fc98e8..5982f3ef1 160000 --- a/thirdparty/cudf +++ b/thirdparty/cudf @@ -1 +1 @@ -Subproject commit e51fc98e852759afaa8b6ae0483dbe6db2d00dc6 +Subproject commit 5982f3ef1b4124d941ef1616ea1411a983beac8d From a8b730b0e36d1cc82466a94d3daf2a416851220c Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 20 Dec 2024 11:48:56 -0800 Subject: [PATCH 10/11] Adapt to changes in cudf Signed-off-by: Nghia Truong --- src/main/cpp/src/aggregation_utils.hpp | 4 +- src/main/cpp/src/test_host_udf_agg.cu | 439 ++++++++++-------- .../nvidia/spark/rapids/jni/TestHostUDF.java | 11 +- 3 files changed, 245 insertions(+), 209 deletions(-) diff --git a/src/main/cpp/src/aggregation_utils.hpp b/src/main/cpp/src/aggregation_utils.hpp index 940fe4e63..f44c19078 100644 --- a/src/main/cpp/src/aggregation_utils.hpp +++ b/src/main/cpp/src/aggregation_utils.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ #pragma once -#include +#include namespace spark_rapids_jni { diff --git a/src/main/cpp/src/test_host_udf_agg.cu b/src/main/cpp/src/test_host_udf_agg.cu index 1ba05f47b..a0151192e 100644 --- a/src/main/cpp/src/test_host_udf_agg.cu +++ b/src/main/cpp/src/test_host_udf_agg.cu @@ -16,8 +16,10 @@ #include "aggregation_utils.hpp" +#include #include #include +#include #include #include #include @@ -36,144 +38,88 @@ namespace spark_rapids_jni { namespace { + /** - * @brief A host-based UDF implementation. - * - * The aggregations perform the following computation: - * - For reduction: compute `sum(value^2, for value in group)` (this is sum of squared). - * - For segmented reduction: compute `segment_size * sum(value^2, for value in group)`. - * - For groupby: compute `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. + * @brief A host-based UDF implementation for reduction. * - * In addition, for segmented reduction, if null_policy is set to `INCLUDE`, the null values are - * replaced with an initial value if it is provided. + * The aggregation computes `sum(value^2, for value in group)` (this is sum of squared). */ -template -struct test_udf_simple_type : cudf::host_udf_base { - static_assert(std::is_same_v || - std::is_same_v || - std::is_same_v); - - test_udf_simple_type() = default; - - [[nodiscard]] input_data_attributes get_required_data() const override - { - if constexpr (std::is_same_v || - std::is_same_v) { - // Empty set, which means we need everything. - return {}; - } else { - return {groupby_data_attribute::GROUPED_VALUES, - groupby_data_attribute::GROUP_OFFSETS, - groupby_data_attribute::GROUP_LABELS, - cudf::make_max_aggregation(), - cudf::make_sum_aggregation()}; - } - } +struct host_udf_reduction_example : cudf::host_udf_base { + host_udf_reduction_example() = default; - [[nodiscard]] output_type operator()(host_udf_input const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const override - { - if constexpr (std::is_same_v) { - auto const& values = - std::get(input.at(reduction_data_attribute::INPUT_VALUES)); - auto const output_dtype = - std::get(input.at(reduction_data_attribute::OUTPUT_DTYPE)); - return cudf::double_type_dispatcher( - values.type(), output_dtype, reduce_fn{this}, input, stream, mr); - } else if constexpr (std::is_same_v) { - auto const& values = - std::get(input.at(segmented_reduction_data_attribute::INPUT_VALUES)); - auto const output_dtype = - std::get(input.at(segmented_reduction_data_attribute::OUTPUT_DTYPE)); - return cudf::double_type_dispatcher( - values.type(), output_dtype, segmented_reduce_fn{this}, input, stream, mr); - } else { - auto const& values = - std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); - return cudf::type_dispatcher(values.type(), groupby_fn{this}, input, stream, mr); - } - } - - [[nodiscard]] output_type get_empty_output( + [[nodiscard]] output_t get_empty_output( [[maybe_unused]] std::optional output_dtype, [[maybe_unused]] rmm::cuda_stream_view stream, [[maybe_unused]] rmm::device_async_resource_ref mr) const override { - if constexpr (std::is_same_v) { - CUDF_EXPECTS(output_dtype.has_value(), - "Data type for the reduction result must be specified."); - return cudf::make_default_constructed_scalar(output_dtype.value(), stream, mr); - } else if constexpr (std::is_same_v) { - CUDF_EXPECTS(output_dtype.has_value(), - "Data type for the reduction result must be specified."); - return cudf::make_empty_column(output_dtype.value()); - } else { - return cudf::make_empty_column( - cudf::data_type{cudf::type_to_id()}); - } + CUDF_EXPECTS(output_dtype.has_value(), "Data type for the reduction result must be specified."); + return cudf::make_default_constructed_scalar(output_dtype.value(), stream, mr); } - [[nodiscard]] bool is_equal(host_udf_base const& other) const override + [[nodiscard]] output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override { - // Just check if the other object is also instance of the same derived class. - return dynamic_cast(&other) != nullptr; + auto const& values = + std::get(input.at(reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(reduction_data_attribute::OUTPUT_DTYPE)); + return cudf::double_type_dispatcher( + values.type(), output_dtype, reduce_fn{this}, input, stream, mr); } [[nodiscard]] std::size_t do_hash() const override { - return std::hash{}({"test_udf_simple_type"}); - } - - [[nodiscard]] std::unique_ptr clone() const override - { - return std::make_unique(); + // Just return the same hash for all instances of this class. + return std::size_t{12345}; } - // For faster compile times, we only support a few input/output types. - template - static constexpr bool is_valid_input_t() + [[nodiscard]] bool is_equal(host_udf_base const& other) const override { - return std::is_same_v || std::is_same_v; + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; } - // For faster compile times, we only support a few input/output types. - template - static constexpr bool is_valid_output_t() + [[nodiscard]] std::unique_ptr clone() const override { - return std::is_same_v || std::is_same_v; + return std::make_unique(); } struct reduce_fn { // Store pointer to the parent class so we can call its functions. - test_udf_simple_type const* parent; + host_udf_reduction_example const* parent; - template () || !is_valid_output_t())> - output_type operator()(Args...) const + CUDF_ENABLE_IF(!std::is_same_v || !std::is_same_v)> + output_t operator()(Args...) const { - CUDF_FAIL("Unsupported input type."); + CUDF_FAIL("Unsupported input/output type."); } - template () && is_valid_output_t())> - output_type operator()(host_udf_input const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const + template && std::is_same_v)> + output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const { auto const& values = std::get(input.at(reduction_data_attribute::INPUT_VALUES)); auto const output_dtype = std::get(input.at(reduction_data_attribute::OUTPUT_DTYPE)); + CUDF_EXPECTS(output_dtype == cudf::data_type{cudf::type_to_id()}, + "Invalid output type."); + if (values.size() == 0) { return parent->get_empty_output(output_dtype, stream, mr); } + auto const input_init_value = std::get>>( input.at(reduction_data_attribute::INIT_VALUE)); - - if (values.size() == 0) { return parent->get_empty_output(output_dtype, stream, mr); } - auto const init_value = [&]() -> InputType { if (input_init_value.has_value() && input_init_value.value().get().is_valid(stream)) { auto const numeric_init_scalar = @@ -185,35 +131,18 @@ struct test_udf_simple_type : cudf::host_udf_base { }(); auto const values_dv_ptr = cudf::column_device_view::create(values, stream); - auto const result = - thrust::transform_reduce(rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(values.size()), - transform_fn{*values_dv_ptr}, - static_cast(init_value), - thrust::plus<>{}); - - if constexpr (std::is_same_v && std::is_same_v) { - std::vector in(values.size()); - CUDF_CUDA_TRY(cudaMemcpyAsync(in.data(), - values.begin(), - values.size() * sizeof(double), - cudaMemcpyDefault, - stream.value())); - stream.synchronize(); - std::cout << "input: " << std::endl; - for (auto x : in) { - std::cout << x << ", "; - } - std::cout << "\nresult: " << result << std::endl; - } + auto const result = thrust::transform_reduce(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(values.size()), + transform_fn{*values_dv_ptr}, + static_cast(init_value), + thrust::plus<>{}); auto output = cudf::make_numeric_scalar(output_dtype, stream, mr); static_cast*>(output.get())->set_value(result, stream); return output; } - template struct transform_fn { cudf::column_device_view values; OutputType __device__ operator()(cudf::size_type idx) const @@ -224,49 +153,99 @@ struct test_udf_simple_type : cudf::host_udf_base { } }; }; +}; + +/** + * @brief A host-based UDF implementation for segmented reduction. + * + * The aggregation computes `sum(value^2, for value in group)` (this is sum of squared). + */ +struct host_udf_segmented_reduction_example : cudf::host_udf_base { + host_udf_segmented_reduction_example() = default; + + [[nodiscard]] output_t get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + CUDF_EXPECTS(output_dtype.has_value(), + "Data type for the segmented reduction result must be specified."); + return cudf::make_default_constructed_scalar(output_dtype.value(), stream, mr); + } + + [[nodiscard]] output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + auto const& values = + std::get(input.at(segmented_reduction_data_attribute::INPUT_VALUES)); + auto const output_dtype = + std::get(input.at(segmented_reduction_data_attribute::OUTPUT_DTYPE)); + return cudf::double_type_dispatcher( + values.type(), output_dtype, segmented_reduce_fn{this}, input, stream, mr); + } + + [[nodiscard]] std::size_t do_hash() const override + { + // Just return the same hash for all instances of this class. + return std::size_t{12345}; + } + + [[nodiscard]] bool is_equal(host_udf_base const& other) const override + { + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(); + } struct segmented_reduce_fn { // Store pointer to the parent class so we can call its functions. - test_udf_simple_type const* parent; + host_udf_segmented_reduction_example const* parent; + + // For simplicity, this example only accepts a single type input and output. + using InputType = double; + using OutputType = int64_t; - template () || !is_valid_output_t())> - output_type operator()(Args...) const + CUDF_ENABLE_IF(!std::is_same_v || !std::is_same_v)> + output_t operator()(Args...) const { - CUDF_FAIL("Unsupported input type."); + CUDF_FAIL("Unsupported input/output type."); } - template () && is_valid_output_t())> - output_type operator()(host_udf_input const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const + template && std::is_same_v)> + output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const { auto const& values = std::get(input.at(segmented_reduction_data_attribute::INPUT_VALUES)); auto const output_dtype = std::get(input.at(segmented_reduction_data_attribute::OUTPUT_DTYPE)); + CUDF_EXPECTS(output_dtype == cudf::data_type{cudf::type_to_id()}, + "Invalid output type."); auto const offsets = std::get>( input.at(segmented_reduction_data_attribute::OFFSETS)); CUDF_EXPECTS(offsets.size() > 0, "Invalid offsets."); auto const num_segments = static_cast(offsets.size()) - 1; if (values.size() == 0) { - if (num_segments <= 0) { - return parent->get_empty_output(output_dtype, stream, mr); - } else { - return cudf::make_numeric_column( - output_dtype, num_segments, cudf::mask_state::ALL_NULL, stream, mr); - } + if (num_segments <= 0) { return parent->get_empty_output(output_dtype, stream, mr); } + return cudf::make_numeric_column( + output_dtype, num_segments, cudf::mask_state::ALL_NULL, stream, mr); } auto const input_init_value = std::get>>( input.at(segmented_reduction_data_attribute::INIT_VALUE)); - auto const init_value = [&]() -> InputType { if (input_init_value.has_value() && input_init_value.value().get().is_valid(stream)) { auto const numeric_init_scalar = @@ -279,37 +258,44 @@ struct test_udf_simple_type : cudf::host_udf_base { auto const null_handling = std::get(input.at(segmented_reduction_data_attribute::NULL_POLICY)); - auto const values_dv_ptr = cudf::column_device_view::create(values, stream); auto output = cudf::make_numeric_column( output_dtype, num_segments, cudf::mask_state::UNALLOCATED, stream); - rmm::device_uvector validity(num_segments, stream); + + // Store row index if it is valid, otherwise store a negative value denoting a null row. + rmm::device_uvector valid_idx(num_segments, stream); thrust::transform( rmm::exec_policy(stream), thrust::make_counting_iterator(0), thrust::make_counting_iterator(num_segments), - thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), - transform_fn{ - *values_dv_ptr, offsets, static_cast(init_value), null_handling}); - auto [null_mask, null_count] = - cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); - if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } - return output; + thrust::make_zip_iterator(output->mutable_view().begin(), valid_idx.begin()), + transform_fn{*values_dv_ptr, offsets, static_cast(init_value), null_handling}); + + auto const valid_idx_cv = cudf::column_view{ + cudf::data_type{cudf::type_id::INT32}, num_segments, valid_idx.begin(), nullptr, 0}; + return std::move(cudf::gather(cudf::table_view{{output->view()}}, + valid_idx_cv, + cudf::out_of_bounds_policy::NULLIFY, + stream, + mr) + ->release() + .front()); } - template struct transform_fn { cudf::column_device_view values; cudf::device_span offsets; OutputType init_value; cudf::null_policy null_handling; - thrust::tuple __device__ operator()(cudf::size_type idx) const + thrust::tuple __device__ operator()(cudf::size_type idx) const { auto const start = offsets[idx]; auto const end = offsets[idx + 1]; - if (start == end) { return {OutputType{0}, false}; } + + auto constexpr invalid_idx = cuda::std::numeric_limits::lowest(); + if (start == end) { return {OutputType{0}, invalid_idx}; } auto sum = init_value; for (auto i = start; i < end; ++i) { @@ -321,30 +307,85 @@ struct test_udf_simple_type : cudf::host_udf_base { sum += val * val; } auto const segment_size = end - start; - return {static_cast(segment_size) * sum, true}; + return {static_cast(segment_size) * sum, idx}; } }; }; +}; + +/** + * @brief A host-based UDF implementation for groupby. + * + * For each group of values, the aggregation computes + * `(group_idx + 1) * group_sum_of_squares - group_max * group_sum`. + */ +struct host_udf_groupby_example : cudf::host_udf_base { + host_udf_groupby_example() = default; + + [[nodiscard]] data_attribute_set_t get_required_data() const override + { + // We need grouped values, group offsets, group labels, and also results from groups' + // MAX and SUM aggregations. + return {groupby_data_attribute::GROUPED_VALUES, + groupby_data_attribute::GROUP_OFFSETS, + groupby_data_attribute::GROUP_LABELS, + cudf::make_max_aggregation(), + cudf::make_sum_aggregation()}; + } + + [[nodiscard]] output_t get_empty_output( + [[maybe_unused]] std::optional output_dtype, + [[maybe_unused]] rmm::cuda_stream_view stream, + [[maybe_unused]] rmm::device_async_resource_ref mr) const override + { + return cudf::make_empty_column( + cudf::data_type{cudf::type_to_id()}); + } + + [[nodiscard]] output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const override + { + auto const& values = + std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); + return cudf::type_dispatcher(values.type(), groupby_fn{this}, input, stream, mr); + } + + [[nodiscard]] std::size_t do_hash() const override + { + // Just return the same hash for all instances of this class. + return std::size_t{12345}; + } + + [[nodiscard]] bool is_equal(host_udf_base const& other) const override + { + // Just check if the other object is also instance of this class. + return dynamic_cast(&other) != nullptr; + } + + [[nodiscard]] std::unique_ptr clone() const override + { + return std::make_unique(); + } struct groupby_fn { // Store pointer to the parent class so we can call its functions. - test_udf_simple_type const* parent; + host_udf_groupby_example const* parent; + + // For simplicity, this example only accepts double input and always produces double output. + using InputType = double; using OutputType = double; - template - using MaxType = cudf::detail::target_type_t; - template - using SumType = cudf::detail::target_type_t; - template ())> - output_type operator()(Args...) const + template )> + output_t operator()(Args...) const { CUDF_FAIL("Unsupported input type."); } - template ())> - output_type operator()(host_udf_input const& input, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) const + template )> + output_t operator()(input_map_t const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const { auto const& values = std::get(input.at(groupby_data_attribute::GROUPED_VALUES)); @@ -362,41 +403,51 @@ struct test_udf_simple_type : cudf::host_udf_base { input.at(cudf::make_sum_aggregation())); auto const values_dv_ptr = cudf::column_device_view::create(values, stream); - auto output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, - num_groups, - cudf::mask_state::UNALLOCATED, - stream); - rmm::device_uvector validity(num_groups, stream); + auto const output = cudf::make_numeric_column(cudf::data_type{cudf::type_to_id()}, + num_groups, + cudf::mask_state::UNALLOCATED, + stream, + mr); + + // Store row index if it is valid, otherwise store a negative value denoting a null row. + rmm::device_uvector valid_idx(num_groups, stream); thrust::transform( rmm::exec_policy(stream), thrust::make_counting_iterator(0), thrust::make_counting_iterator(num_groups), - thrust::make_zip_iterator(output->mutable_view().begin(), validity.begin()), - transform_fn{*values_dv_ptr, - offsets, - group_indices, - group_max.begin>(), - group_sum.begin>()}); - auto [null_mask, null_count] = - cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity<>{}, stream, mr); - if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } - return output; + thrust::make_zip_iterator(output->mutable_view().begin(), valid_idx.begin()), + transform_fn{*values_dv_ptr, + offsets, + group_indices, + group_max.begin(), + group_sum.begin()}); + + auto const valid_idx_cv = cudf::column_view{ + cudf::data_type{cudf::type_id::INT32}, num_groups, valid_idx.begin(), nullptr, 0}; + return std::move(cudf::gather(cudf::table_view{{output->view()}}, + valid_idx_cv, + cudf::out_of_bounds_policy::NULLIFY, + stream, + mr) + ->release() + .front()); } - template struct transform_fn { cudf::column_device_view values; cudf::device_span offsets; cudf::device_span group_indices; - MaxType const* group_max; - SumType const* group_sum; + InputType const* group_max; + InputType const* group_sum; - thrust::tuple __device__ operator()(cudf::size_type idx) const + thrust::tuple __device__ operator()(cudf::size_type idx) const { auto const start = offsets[idx]; auto const end = offsets[idx + 1]; - if (start == end) { return {OutputType{0}, false}; } + + auto constexpr invalid_idx = cuda::std::numeric_limits::lowest(); + if (start == end) { return {OutputType{0}, invalid_idx}; } auto sum_sqr = OutputType{0}; bool has_valid{false}; @@ -407,10 +458,10 @@ struct test_udf_simple_type : cudf::host_udf_base { sum_sqr += val * val; } - if (!has_valid) { return {OutputType{0}, false}; } + if (!has_valid) { return {OutputType{0}, invalid_idx}; } return {static_cast(group_indices[start] + 1) * sum_sqr - static_cast(group_max[idx]) * static_cast(group_sum[idx]), - true}; + idx}; } }; }; @@ -420,23 +471,17 @@ struct test_udf_simple_type : cudf::host_udf_base { std::unique_ptr create_test_reduction_host_udf() { - // This only use the groupby code. - // Reduction and segmented reduction code is unused. - return std::make_unique>(); + return std::make_unique(); } std::unique_ptr create_test_segmented_reduction_host_udf() { - // This only use the groupby code. - // Reduction and segmented reduction code is unused. - return std::make_unique>(); + return std::make_unique(); } std::unique_ptr create_test_groupby_host_udf() { - // This only use the groupby code. - // Reduction and segmented reduction code is unused. - return std::make_unique>(); + return std::make_unique(); } } // namespace spark_rapids_jni diff --git a/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java b/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java index 9dce61187..520eec257 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java @@ -51,16 +51,7 @@ public enum AggregationType { } /** - * Create a test host UDF for testing purposes. - *

- * This will return two values: the first is the pointer to the host UDF, and the second is the - * hash code of the host UDF. - *

- * To create a host UDF aggregation, do this: - * ``` - * long[] udfAndHash = AggregationUtils.createTestHostUDF(); - * new ai.rapids.cudf.HostUDFAggregation(udfAndHash[0], udfAndHash[1]); - * ``` + * Create a test host UDF wrapper for testing purposes. */ public static TestHostUDF createTestHostUDF(AggregationType type) { return new TestHostUDF(createNativeTestHostUDF(type.nativeId)); From 1c165728fa797fcb9d220ee9d232424882fd79f2 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Fri, 20 Dec 2024 11:53:16 -0800 Subject: [PATCH 11/11] Update comment Signed-off-by: Nghia Truong --- src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java b/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java index 520eec257..481e3bc76 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/TestHostUDF.java @@ -19,7 +19,7 @@ import ai.rapids.cudf.NativeDepsLoader; import ai.rapids.cudf.HostUDFWrapper; -// A new host UDF implementation must extend Aggregation.HostUDFWrapper, +// A new host UDF implementation must extend HostUDFWrapper, // and override the hashCode and equals methods. public class TestHostUDF extends HostUDFWrapper { static {