Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp committed Oct 18, 2024
1 parent e8857a7 commit 471046b
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 44 deletions.
5 changes: 3 additions & 2 deletions crates/polars-core/src/chunked_array/ops/row_encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use arrow::compute::utils::combine_validities_and_many;
use polars_row::{convert_columns, EncodingField, RowsEncoded};
use rayon::prelude::*;

use crate::{prelude::*, POOL};
use crate::prelude::*;
use crate::utils::_split_offsets;
use crate::POOL;

pub(crate) fn convert_series_for_row_encoding(s: &Series) -> PolarsResult<Series> {
use DataType::*;
Expand Down Expand Up @@ -216,4 +217,4 @@ pub fn _get_rows_encoded_ca_unordered(
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded_unordered(by)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use polars_utils::itertools::Itertools;

use crate::chunked_array::ops::row_encode::_get_rows_encoded;

use super::*;
use crate::chunked_array::ops::row_encode::_get_rows_encoded;

#[derive(Eq)]
struct CompareRow<'a> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use compare_inner::NullOrderCmp;
use polars_utils::itertools::Itertools;

use crate::chunked_array::ops::row_encode::_get_rows_encoded;

use super::*;
use crate::chunked_array::ops::row_encode::_get_rows_encoded;

pub(crate) fn args_validate<T: PolarsDataType>(
ca: &ChunkedArray<T>,
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-core/src/chunked_array/ops/sort/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use compare_inner::NonNull;
use rayon::prelude::*;
pub use slice::*;

use crate::chunked_array::ops::row_encode::{
_get_rows_encoded_ca, convert_series_for_row_encoding,
};
use crate::prelude::compare_inner::TotalOrdInner;
use crate::chunked_array::ops::row_encode::{convert_series_for_row_encoding, _get_rows_encoded_ca};
use crate::prelude::sort::arg_sort_multiple::*;
use crate::prelude::*;
use crate::series::IsSorted;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use polars_utils::aliases::PlHashMap;
use polars_utils::itertools::Itertools;

use crate::chunked_array::cast::CastOptions;
use crate::chunked_array::ChunkedArray;
use crate::chunked_array::ops::row_encode::{_get_rows_encoded_arr, _get_rows_encoded_ca};
use crate::chunked_array::ChunkedArray;
use crate::prelude::*;
use crate::series::Series;
use crate::utils::Container;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/group_by/into_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use polars_utils::total_ord::{ToTotalOrd, TotalHash};

use super::*;
use crate::chunked_array::cast::CastOptions;
use crate::config::verbose;
use crate::chunked_array::ops::row_encode::_get_rows_encoded_ca_unordered;
use crate::config::verbose;
use crate::series::BitRepr;
use crate::utils::flatten::flatten_par;

Expand Down
37 changes: 23 additions & 14 deletions crates/polars-expr/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,52 +8,61 @@ use polars_utils::IdxSize;
mod row_encoded;

/// A Grouper maps keys to groups, such that duplicate keys map to the same group.
pub trait Grouper : Any + Send {
pub trait Grouper: Any + Send {
/// Creates a new empty Grouper similar to this one.
fn new_empty(&self) -> Box<dyn Grouper>;

/// Returns the number of groups in this Grouper.
fn num_groups(&self) -> IdxSize;

/// Inserts the given keys into this Grouper, mutating groups_idxs such
/// that group_idxs[i] is the group index of keys[i].
fn insert_keys(&mut self, keys: &[Column], group_idxs: &mut Vec<IdxSize>);

/// Adds the given Grouper into this one, mutating groups_idxs such that
/// the ith group of other now has group index group_idxs[i] in self.
fn combine(&mut self, other: &dyn Grouper, group_idxs: &mut Vec<IdxSize>);

/// Partitions this Grouper into the given partitions.
///
///
/// Updates partition_idxs and group_idxs such that the ith group of self
/// has group index group_idxs[i] in partition partition_idxs[i].
///
///
/// It is guaranteed that two equal keys in two independent partition_into
/// calls map to the same partition index if the seed and the number of
/// partitions is equal.
fn partition_into(&self, seed: u64, partitions: &mut [Box<dyn Grouper>], partition_idxs: &mut Vec<IdxSize>, group_idxs: &mut Vec<IdxSize>);

fn partition_into(
&self,
seed: u64,
partitions: &mut [Box<dyn Grouper>],
partition_idxs: &mut Vec<IdxSize>,
group_idxs: &mut Vec<IdxSize>,
);

/// Returns the keys in this Grouper in group order, that is the key for
/// group i is returned in row i.
fn get_keys_in_group_order(&self) -> DataFrame;

/// Returns the keys in this Grouper, mutating group_idxs such that the ith
/// key returned corresponds to group group_idxs[i].
fn get_keys_groups(&self, group_idxs: &mut Vec<IdxSize>) -> DataFrame;

/// Stores this Grouper at the given path.
fn store_ooc(&self, _path: &Path) {
unimplemented!();
}

/// Loads this Grouper from the given path.
fn load_ooc(&mut self, _path: &Path) {
unimplemented!();
}

fn as_any(&self) -> &dyn Any;
}

pub fn new_hash_grouper(key_schema: Arc<Schema>, random_state: PlRandomState) -> Box<dyn Grouper> {
Box::new(row_encoded::RowEncodedHashGrouper::new(key_schema, random_state))
}
Box::new(row_encoded::RowEncodedHashGrouper::new(
key_schema,
random_state,
))
}
11 changes: 7 additions & 4 deletions crates/polars-expr/src/groups/row_encoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl RowEncodedHashGrouper {
let s = Series::try_from((name.clone(), col)).unwrap();
match dt {
#[cfg(feature = "dtype-categorical")]
dt @ (DataType::Categorical(rev_map, ordering) | DataType::Enum(rev_map, ordering)) => {
dt @ (DataType::Categorical(rev_map, ordering)
| DataType::Enum(rev_map, ordering)) => {
if let Some(rev_map) = rev_map {
let cats = s.u32().unwrap().clone();
// SAFETY: the rev-map comes from these categoricals.
Expand All @@ -98,9 +99,11 @@ impl RowEncodedHashGrouper {
if polars_core::using_string_cache() {
// SAFETY, we go from logical to primitive back to logical so the categoricals should still match the global map.
unsafe {
CategoricalChunked::from_global_indices_unchecked(cats, *ordering)
.into_column()
.with_name(name.clone())
CategoricalChunked::from_global_indices_unchecked(
cats, *ordering,
)
.into_column()
.with_name(name.clone())
}
} else {
// we set the global string cache once we start a streaming pipeline
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-expr/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
mod expressions;
pub mod groups;
pub mod planner;
pub mod prelude;
pub mod reduce;
pub mod state;
pub mod groups;

pub use crate::planner::{create_physical_expr, ExpressionConversionState};
2 changes: 1 addition & 1 deletion crates/polars-ops/src/frame/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ use hashbrown::hash_map::{Entry, RawEntryMut};
pub use iejoin::{IEJoinOptions, InequalityOperator};
#[cfg(feature = "merge_sorted")]
pub use merge_sorted::_merge_sorted_dfs;
use polars_core::hashing::_HASHMAP_INIT_SIZE;
#[allow(unused_imports)]
use polars_core::chunked_array::ops::row_encode::{
encode_rows_vertical_par_unordered, encode_rows_vertical_par_unordered_broadcast_nulls,
};
use polars_core::hashing::_HASHMAP_INIT_SIZE;
use polars_core::prelude::*;
pub(super) use polars_core::series::IsSorted;
use polars_core::utils::slice_offsets;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-ops/src/series/ops/various.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use num_traits::Bounded;
use polars_core::prelude::arity::unary_elementwise_values;
#[cfg(feature = "dtype-struct")]
use polars_core::chunked_array::ops::row_encode::_get_rows_encoded_ca;
use polars_core::prelude::arity::unary_elementwise_values;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_core::with_match_physical_numeric_polars_type;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::any::Any;

use arrow::array::BinaryArray;
use polars_core::prelude::sort::_broadcast_bools;
use polars_core::chunked_array::ops::row_encode::_get_rows_encoded_compat_array;
use polars_core::prelude::sort::_broadcast_bools;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_row::decode::decode_rows_from_binary;
Expand Down
15 changes: 10 additions & 5 deletions crates/polars-stream/src/nodes/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ impl GroupBySinkState {
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
assert!(receivers.len() >= self.local.len());
self.local.resize_with(receivers.len(), || LocalGroupBySinkState {
grouper: self.grouper.new_empty(),
grouped_reductions: self.grouped_reductions.iter().map(|r| r.new_empty()).collect(),
});
self.local
.resize_with(receivers.len(), || LocalGroupBySinkState {
grouper: self.grouper.new_empty(),
grouped_reductions: self
.grouped_reductions
.iter()
.map(|r| r.new_empty())
.collect(),
});
for (mut recv, local) in receivers.into_iter().zip(&mut self.local) {
let key_selectors = &self.key_selectors;
let grouped_reduction_selectors = &self.grouped_reduction_selectors;
Expand Down Expand Up @@ -208,7 +213,7 @@ impl ComputeNode for GroupByNode {
GroupByState::Source(source) => {
assert!(recv[0].is_none());
source.spawn(scope, &mut [], send, state, join_handles);
}
},
GroupByState::Done => unreachable!(),
}
}
Expand Down
12 changes: 6 additions & 6 deletions crates/polars-stream/src/physical_plan/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ fn visualize_plan_rec(

(out, &[][..])
},
PhysNodeKind::GroupBy {
input,
key,
aggs,
} => {
PhysNodeKind::GroupBy { input, key, aggs } => {
let label = "group-by";
(
format!("{label}\\nkey:\\n{}\\naggs:\\n{}", fmt_exprs(key, expr_arena), fmt_exprs(aggs, expr_arena)),
format!(
"{label}\\nkey:\\n{}\\naggs:\\n{}",
fmt_exprs(key, expr_arena),
fmt_exprs(aggs, expr_arena)
),
from_ref(input),
)
},
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ pub fn lower_ir(
if options.dynamic.is_some() || options.rolling.is_some() || maintain_order {
todo!()
}

polars_ensure!(!keys.is_empty(), ComputeError: "at least one key is required in a group_by operation");

// TODO: allow all aggregates.
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub enum PhysNodeKind {
scan_type: FileScan,
file_options: FileScanOptions,
},

GroupBy {
input: PhysNodeKey,
key: Vec<ExprIR>,
Expand Down

0 comments on commit 471046b

Please sign in to comment.