Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into hll
Browse files Browse the repository at this point in the history
  • Loading branch information
Chong Gao committed Nov 27, 2024
2 parents 956af39 + 4d5b51d commit 8aaf0f6
Show file tree
Hide file tree
Showing 17 changed files with 1,771 additions and 156 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "thirdparty/cudf"]
path = thirdparty/cudf
url = https://github.com/rapidsai/cudf.git
branch = branch-24.12
branch = branch-25.02
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ $ ./build/build-in-docker install ...
```

Now cd to ~/repos/NVIDIA/spark-rapids and build with one of the options from
[spark-rapids instructions](https://github.com/NVIDIA/spark-rapids/blob/branch-24.12/CONTRIBUTING.md#building-from-source).
[spark-rapids instructions](https://github.com/NVIDIA/spark-rapids/blob/branch-25.02/CONTRIBUTING.md#building-from-source).

```bash
$ ./build/buildall
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<groupId>com.nvidia</groupId>
<artifactId>spark-rapids-jni</artifactId>
<version>24.12.0-SNAPSHOT</version>
<version>25.02.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>RAPIDS Accelerator JNI for Apache Spark</name>
<description>
Expand Down
7 changes: 5 additions & 2 deletions src/main/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ rapids_cuda_init_architectures(SPARK_RAPIDS_JNI)

project(
SPARK_RAPIDS_JNI
VERSION 24.12.00
VERSION 25.02.00
LANGUAGES C CXX CUDA
)

Expand Down Expand Up @@ -209,13 +209,14 @@ add_library(
src/bloom_filter.cu
src/case_when.cu
src/cast_decimal_to_string.cu
src/format_float.cu
src/cast_float_to_string.cu
src/cast_string.cu
src/cast_string_to_float.cu
src/datetime_rebase.cu
src/decimal_utils.cu
src/format_float.cu
src/from_json_to_raw_map.cu
src/from_json_to_structs.cu
src/get_json_object.cu
src/histogram.cu
src/json_utils.cu
Expand Down Expand Up @@ -282,6 +283,8 @@ target_link_libraries(
${ARROW_LIB}
${PARQUET_LIB}
${THRIFT_LIB}
PUBLIC rmm::rmm
PRIVATE $<TARGET_NAME_IF_EXISTS:rmm::rmm_logger_impl>
)
rapids_cuda_set_runtime(spark_rapids_jni USE_STATIC ON)
set_target_properties(spark_rapids_jni PROPERTIES LINK_LANGUAGE "CXX")
Expand Down
126 changes: 97 additions & 29 deletions src/main/cpp/src/JSONUtilsJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,50 +166,118 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_extractRawMap
CATCH_STD(env, 0);
}

JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_concatenateJsonStrings(
JNIEnv* env, jclass, jlong j_input)
JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_JSONUtils_fromJSONToStructs(JNIEnv* env,
jclass,
jlong j_input,
jobjectArray j_col_names,
jintArray j_num_children,
jintArray j_types,
jintArray j_scales,
jintArray j_precisions,
jboolean normalize_single_quotes,
jboolean allow_leading_zeros,
jboolean allow_nonnumeric_numbers,
jboolean allow_unquoted_control,
jboolean is_us_locale)
{
JNI_NULL_CHECK(env, j_input, "j_input is null", 0);
JNI_NULL_CHECK(env, j_col_names, "j_col_names is null", 0);
JNI_NULL_CHECK(env, j_num_children, "j_num_children is null", 0);
JNI_NULL_CHECK(env, j_types, "j_types is null", 0);
JNI_NULL_CHECK(env, j_scales, "j_scales is null", 0);
JNI_NULL_CHECK(env, j_precisions, "j_precisions is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const input_cv = reinterpret_cast<cudf::column_view const*>(j_input);

// Currently, set `nullify_invalid_rows = false` as `concatenateJsonStrings` is used only for
// `from_json` with struct schema.
auto [joined_strings, delimiter, should_be_nullify] = spark_rapids_jni::concat_json(
cudf::strings_column_view{*input_cv}, /*nullify_invalid_rows*/ false);

// The output array contains 5 elements:
// [0]: address of the cudf::column object `is_valid` in host memory
// [1]: address of data buffer of the concatenated strings in device memory
// [2]: data length
// [3]: address of the rmm::device_buffer object (of the concatenated strings) in host memory
// [4]: delimiter char
auto out_handles = cudf::jni::native_jlongArray(env, 5);
out_handles[0] = reinterpret_cast<jlong>(should_be_nullify.release());
out_handles[1] = reinterpret_cast<jlong>(joined_strings->data());
out_handles[2] = static_cast<jlong>(joined_strings->size());
out_handles[3] = reinterpret_cast<jlong>(joined_strings.release());
out_handles[4] = static_cast<jlong>(delimiter);
return out_handles.get_jArray();
auto const input_cv = reinterpret_cast<cudf::column_view const*>(j_input);
auto const col_names = cudf::jni::native_jstringArray(env, j_col_names).as_cpp_vector();
auto const num_children = cudf::jni::native_jintArray(env, j_num_children).to_vector();
auto const types = cudf::jni::native_jintArray(env, j_types).to_vector();
auto const scales = cudf::jni::native_jintArray(env, j_scales).to_vector();
auto const precisions = cudf::jni::native_jintArray(env, j_precisions).to_vector();

CUDF_EXPECTS(col_names.size() > 0, "Invalid schema data: col_names.");
CUDF_EXPECTS(col_names.size() == num_children.size(), "Invalid schema data: num_children.");
CUDF_EXPECTS(col_names.size() == types.size(), "Invalid schema data: types.");
CUDF_EXPECTS(col_names.size() == scales.size(), "Invalid schema data: scales.");
CUDF_EXPECTS(col_names.size() == precisions.size(), "Invalid schema data: precisions.");

return cudf::jni::ptr_as_jlong(
spark_rapids_jni::from_json_to_structs(cudf::strings_column_view{*input_cv},
col_names,
num_children,
types,
scales,
precisions,
normalize_single_quotes,
allow_leading_zeros,
allow_nonnumeric_numbers,
allow_unquoted_control,
is_us_locale)
.release());
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_makeStructs(
JNIEnv* env, jclass, jlongArray j_children, jlong j_is_null)
JNIEXPORT jlong JNICALL
Java_com_nvidia_spark_rapids_jni_JSONUtils_convertFromStrings(JNIEnv* env,
jclass,
jlong j_input,
jintArray j_num_children,
jintArray j_types,
jintArray j_scales,
jintArray j_precisions,
jboolean allow_nonnumeric_numbers,
jboolean is_us_locale)
{
JNI_NULL_CHECK(env, j_children, "j_children is null", 0);
JNI_NULL_CHECK(env, j_is_null, "j_is_null is null", 0);
JNI_NULL_CHECK(env, j_input, "j_input is null", 0);
JNI_NULL_CHECK(env, j_num_children, "j_num_children is null", 0);
JNI_NULL_CHECK(env, j_types, "j_types is null", 0);
JNI_NULL_CHECK(env, j_scales, "j_scales is null", 0);
JNI_NULL_CHECK(env, j_precisions, "j_precisions is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const children =
cudf::jni::native_jpointerArray<cudf::column_view>{env, j_children}.get_dereferenced();
auto const is_null = *reinterpret_cast<cudf::column_view const*>(j_is_null);
return cudf::jni::ptr_as_jlong(spark_rapids_jni::make_structs(children, is_null).release());

auto const input_cv = reinterpret_cast<cudf::column_view const*>(j_input);
auto const num_children = cudf::jni::native_jintArray(env, j_num_children).to_vector();
auto const types = cudf::jni::native_jintArray(env, j_types).to_vector();
auto const scales = cudf::jni::native_jintArray(env, j_scales).to_vector();
auto const precisions = cudf::jni::native_jintArray(env, j_precisions).to_vector();

CUDF_EXPECTS(num_children.size() > 0, "Invalid schema data: num_children.");
CUDF_EXPECTS(num_children.size() == types.size(), "Invalid schema data: types.");
CUDF_EXPECTS(num_children.size() == scales.size(), "Invalid schema data: scales.");
CUDF_EXPECTS(num_children.size() == precisions.size(), "Invalid schema data: precisions.");

return cudf::jni::ptr_as_jlong(
spark_rapids_jni::convert_from_strings(cudf::strings_column_view{*input_cv},
num_children,
types,
scales,
precisions,
allow_nonnumeric_numbers,
is_us_locale)
.release());
}
CATCH_STD(env, 0);
}

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_removeQuotes(
JNIEnv* env, jclass, jlong j_input, jboolean nullify_if_not_quoted)
{
JNI_NULL_CHECK(env, j_input, "j_input is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const input_cv = reinterpret_cast<cudf::column_view const*>(j_input);
return cudf::jni::ptr_as_jlong(
spark_rapids_jni::remove_quotes(cudf::strings_column_view{*input_cv}, nullify_if_not_quoted)
.release());
}
CATCH_STD(env, 0);
}

} // extern "C"
10 changes: 5 additions & 5 deletions src/main/cpp/src/SparkResourceAdaptorJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ class full_thread_state {
// time)
long time_retry_running_nanos = 0;
std::chrono::time_point<std::chrono::steady_clock> block_start;
long gpu_memory_allocated_bytes = 0;

// metrics for the current thread
task_metrics metrics;
Expand Down Expand Up @@ -891,6 +890,8 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
std::condition_variable task_has_woken_condition;
std::map<long, full_thread_state> threads;
std::map<long, std::set<long>> task_to_threads;
long gpu_memory_allocated_bytes = 0;

// Metrics are a little complicated. Spark reports metrics at a task level
// but we track and collect them at a thread level. The life time of a thread
// and a task are not tied to each other, and a thread can work on things for
Expand Down Expand Up @@ -1376,10 +1377,9 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
// num_bytes is likely not padded, which could cause slight inaccuracies
// but for now it shouldn't matter for watermark purposes
if (!is_for_cpu) {
thread->second.gpu_memory_allocated_bytes += num_bytes;
gpu_memory_allocated_bytes += num_bytes;
thread->second.metrics.gpu_max_memory_allocated =
std::max(thread->second.metrics.gpu_max_memory_allocated,
thread->second.gpu_memory_allocated_bytes);
std::max(thread->second.metrics.gpu_max_memory_allocated, gpu_memory_allocated_bytes);
}
break;
default: break;
Expand Down Expand Up @@ -1780,7 +1780,7 @@ class spark_resource_adaptor final : public rmm::mr::device_memory_resource {
auto const thread = threads.find(tid);
if (thread != threads.end()) {
log_status("DEALLOC", tid, thread->second.task_id, thread->second.state);
if (!is_for_cpu) { thread->second.gpu_memory_allocated_bytes -= num_bytes; }
if (!is_for_cpu) { gpu_memory_allocated_bytes -= num_bytes; }
} else {
log_status("DEALLOC", tid, -2, thread_state::UNKNOWN);
}
Expand Down
Loading

0 comments on commit 8aaf0f6

Please sign in to comment.