Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Add streaming groupby for reductions #19291

Merged
merged 19 commits into from
Oct 23, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/polars-arrow/src/compute/aggregate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Contains different aggregation functions
/// ! Contains different aggregation functions
#[cfg(feature = "compute_aggregate")]
mod sum;
#[cfg(feature = "compute_aggregate")]
Expand Down
18 changes: 18 additions & 0 deletions crates/polars-core/src/chunked_array/logical/categorical/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,24 @@ impl CategoricalChunked {
}
}

/// Create a [`CategoricalChunked`] from a physical array and dtype.
///
/// # Safety
/// It's not checked that the indices are in-bounds or that the dtype is
/// correct.
pub unsafe fn from_cats_and_dtype_unchecked(idx: UInt32Chunked, dtype: DataType) -> Self {
debug_assert!(matches!(
dtype,
DataType::Enum { .. } | DataType::Categorical { .. }
));
let mut logical = Logical::<UInt32Type, _>::new_logical::<CategoricalType>(idx);
logical.2 = Some(dtype);
Self {
physical: logical,
bit_settings: Default::default(),
}
}

/// Create a [`CategoricalChunked`] from an array of `idx` and an existing [`RevMapping`]: `rev_map`.
///
/// # Safety
Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub(crate) mod nulls;
mod reverse;
#[cfg(feature = "rolling_window")]
pub(crate) mod rolling_window;
pub mod row_encode;
pub mod search_sorted;
mod set;
mod shift;
Expand Down
220 changes: 220 additions & 0 deletions crates/polars-core/src/chunked_array/ops/row_encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use arrow::compute::utils::combine_validities_and_many;
use polars_row::{convert_columns, EncodingField, RowsEncoded};
use rayon::prelude::*;

use crate::prelude::*;
use crate::utils::_split_offsets;
use crate::POOL;

pub(crate) fn convert_series_for_row_encoding(s: &Series) -> PolarsResult<Series> {
use DataType::*;
let out = match s.dtype() {
#[cfg(feature = "dtype-categorical")]
Categorical(_, _) | Enum(_, _) => s.rechunk(),
Binary | Boolean => s.clone(),
BinaryOffset => s.clone(),
String => s.str().unwrap().as_binary().into_series(),
#[cfg(feature = "dtype-struct")]
Struct(_) => {
let ca = s.struct_().unwrap();
let new_fields = ca
.fields_as_series()
.iter()
.map(convert_series_for_row_encoding)
.collect::<PolarsResult<Vec<_>>>()?;
let mut out =
StructChunked::from_series(ca.name().clone(), ca.len(), new_fields.iter())?;
out.zip_outer_validity(ca);
out.into_series()
},
// we could fallback to default branch, but decimal is not numeric dtype for now, so explicit here
#[cfg(feature = "dtype-decimal")]
Decimal(_, _) => s.clone(),
List(inner) if !inner.is_nested() => s.clone(),
Null => s.clone(),
_ => {
let phys = s.to_physical_repr().into_owned();
polars_ensure!(
phys.dtype().is_numeric(),
InvalidOperation: "cannot sort column of dtype `{}`", s.dtype()
);
phys
},
};
Ok(out)
}

pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult<ArrayRef> {
let by = convert_series_for_row_encoding(by)?;
let by = by.rechunk();

let out = match by.dtype() {
#[cfg(feature = "dtype-categorical")]
DataType::Categorical(_, _) | DataType::Enum(_, _) => {
let ca = by.categorical().unwrap();
if ca.uses_lexical_ordering() {
by.to_arrow(0, CompatLevel::newest())
} else {
ca.physical().chunks[0].clone()
}
},
// Take physical
_ => by.chunks()[0].clone(),
};
Ok(out)
}

pub fn encode_rows_vertical_par_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);

let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;
Ok(rows.into_array())
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());

Ok(BinaryOffsetChunked::from_chunk_iter(
PlSmallStr::EMPTY,
chunks?,
))
}

// Almost the same but broadcast nulls to the row-encoded array.
pub fn encode_rows_vertical_par_unordered_broadcast_nulls(
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);

let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded_unordered(&sliced)?;

let validities = sliced
.iter()
.flat_map(|s| {
let s = s.rechunk();
#[allow(clippy::unnecessary_to_owned)]
s.chunks()
.to_vec()
.into_iter()
.map(|arr| arr.validity().cloned())
})
.collect::<Vec<_>>();

let validity = combine_validities_and_many(&validities);
Ok(rows.into_array().with_validity_typed(validity))
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());

Ok(BinaryOffsetChunked::from_chunk_iter(
PlSmallStr::EMPTY,
chunks?,
))
}

pub fn encode_rows_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let rows = _get_rows_encoded_unordered(by)?;
Ok(BinaryOffsetChunked::with_chunk(
PlSmallStr::EMPTY,
rows.into_array(),
))
}

pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
let mut cols = Vec::with_capacity(by.len());
let mut fields = Vec::with_capacity(by.len());
for by in by {
let arr = _get_rows_encoded_compat_array(by)?;
let field = EncodingField::new_unsorted();
match arr.dtype() {
// Flatten the struct fields.
ArrowDataType::Struct(_) => {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
for arr in arr.values() {
cols.push(arr.clone() as ArrayRef);
fields.push(field)
}
},
_ => {
cols.push(arr);
fields.push(field)
},
}
}
Ok(convert_columns(&cols, &fields))
}

pub fn _get_rows_encoded(
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<RowsEncoded> {
debug_assert_eq!(by.len(), descending.len());
debug_assert_eq!(by.len(), nulls_last.len());

let mut cols = Vec::with_capacity(by.len());
let mut fields = Vec::with_capacity(by.len());

for ((by, desc), null_last) in by.iter().zip(descending).zip(nulls_last) {
let by = by.as_materialized_series();
let arr = _get_rows_encoded_compat_array(by)?;
let sort_field = EncodingField {
descending: *desc,
nulls_last: *null_last,
no_order: false,
};
match arr.dtype() {
// Flatten the struct fields.
ArrowDataType::Struct(_) => {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
let arr = arr.propagate_nulls();
for value_arr in arr.values() {
cols.push(value_arr.clone() as ArrayRef);
fields.push(sort_field);
}
},
_ => {
cols.push(arr);
fields.push(sort_field);
},
}
}
Ok(convert_columns(&cols, &fields))
}

pub fn _get_rows_encoded_ca(
name: PlSmallStr,
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded(by, descending, nulls_last)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}

pub fn _get_rows_encoded_arr(
by: &[Column],
descending: &[bool],
nulls_last: &[bool],
) -> PolarsResult<BinaryArray<i64>> {
_get_rows_encoded(by, descending, nulls_last).map(|rows| rows.into_array())
}

pub fn _get_rows_encoded_ca_unordered(
name: PlSmallStr,
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded_unordered(by)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use polars_utils::itertools::Itertools;

use super::*;
use crate::chunked_array::ops::row_encode::_get_rows_encoded;

#[derive(Eq)]
struct CompareRow<'a> {
Expand Down
Loading
Loading