Skip to content

Commit

Permalink
move projection pushdown optimization logic to ExecutionPlan trait (#…
Browse files Browse the repository at this point in the history
…14235)

* first iteration

* wrap up rest of them

* refactor mroe

* fix the tests

* minor

* test changes

* Update projection.rs

* investigate further

* revert test changes with the fix

* Update projection_pushdown.rs

* minor fmt

* fix imports

---------

Co-authored-by: berkaysynnada <[email protected]>
  • Loading branch information
buraksenn and berkaysynnada authored Jan 25, 2025
1 parent 18f14ab commit f775791
Show file tree
Hide file tree
Showing 29 changed files with 1,461 additions and 1,442 deletions.
33 changes: 33 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::projection::{
all_alias_free_columns, new_projections_for_columns, ProjectionExec,
};
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
Expand Down Expand Up @@ -479,6 +482,36 @@ impl ExecutionPlan for CsvExec {
cache: self.cache.clone(),
}))
}

fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// If there is any non-column or alias-carrier expression, Projection should not be removed.
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
Ok(all_alias_free_columns(projection.expr()).then(|| {
let mut file_scan = self.base_config().clone();
let new_projections = new_projections_for_columns(
projection,
&file_scan
.projection
.unwrap_or((0..self.schema().fields().len()).collect()),
);
file_scan.projection = Some(new_projections);

Arc::new(
CsvExec::builder(file_scan)
.with_has_header(self.has_header())
.with_delimeter(self.delimiter())
.with_quote(self.quote())
.with_escape(self.escape())
.with_comment(self.comment())
.with_newlines_in_values(self.newlines_in_values())
.with_file_compression_type(self.file_compression_type)
.build(),
) as _
}))
}
}

/// A Config for [`CsvOpener`]
Expand Down
Loading

0 comments on commit f775791

Please sign in to comment.