Skip to content

Commit

Permalink
minor: refactor prepare_output so that it does not require an Executi…
Browse files Browse the repository at this point in the history
…onContext (apache#1194)
  • Loading branch information
andygrove authored and dharanad committed Jan 1, 2025
1 parent 8ee39df commit 8891549
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ fn prepare_output(
array_addrs: jlongArray,
schema_addrs: jlongArray,
output_batch: RecordBatch,
exec_context: &mut ExecutionContext,
validate: bool,
) -> CometResult<jlong> {
let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
let num_cols = env.get_array_length(&array_address_array)? as usize;
Expand All @@ -255,7 +255,7 @@ fn prepare_output(
)));
}

if exec_context.debug_native {
if validate {
// Validate the output arrays.
for array in results.iter() {
let array_data = array.to_data();
Expand All @@ -275,9 +275,6 @@ fn prepare_output(
i += 1;
}

// Update metrics
update_metrics(env, exec_context)?;

Ok(num_rows as jlong)
}

Expand Down Expand Up @@ -356,22 +353,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let next_item = exec_context.stream.as_mut().unwrap().next();
let poll_output = exec_context.runtime.block_on(async { poll!(next_item) });

// Update metrics
update_metrics(&mut env, exec_context)?;

match poll_output {
Poll::Ready(Some(output)) => {
// prepare output for FFI transfer
return prepare_output(
&mut env,
array_addrs,
schema_addrs,
output?,
exec_context,
exec_context.debug_native,
);
}
Poll::Ready(None) => {
// Reaches EOF of output.

// Update metrics
update_metrics(&mut env, exec_context)?;

if exec_context.explain_native {
if let Some(plan) = &exec_context.root_op {
let formatted_plan_str =
Expand All @@ -391,9 +388,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// A poll pending means there are more than one blocking operators,
// we don't need go back-forth between JVM/Native. Just keeping polling.
Poll::Pending => {
// Update metrics
update_metrics(&mut env, exec_context)?;

// Pull input batches
pull_input_batches(exec_context)?;

Expand Down

0 comments on commit 8891549

Please sign in to comment.