Skip to content

Commit

Permalink
Add predicate and row index handling
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Oct 25, 2024
1 parent e5a888a commit 0a14510
Showing 1 changed file with 157 additions and 101 deletions.
258 changes: 157 additions & 101 deletions crates/polars-stream/src/nodes/io_sources/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::future::Future;
use std::io::Cursor;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
Expand All @@ -19,6 +20,7 @@ use polars_plan::plans::hive::HivePartitions;
use polars_plan::plans::{FileInfo, ScanSources};
use polars_plan::prelude::FileScanOptions;
use polars_utils::mmap::MemSlice;
use polars_utils::IdxSize;

use crate::async_primitives::connector::{connector, Receiver, Sender};
use crate::async_primitives::wait_group::WaitGroup;
Expand All @@ -27,6 +29,8 @@ use crate::nodes::{
ComputeNode, JoinHandle, Morsel, MorselSeq, PortState, TaskPriority, TaskScope,
};
use crate::pipe::{RecvPort, SendPort};
use crate::utils::linearizer::Linearizer;
use crate::DEFAULT_LINEARIZER_BUFFER_SIZE;

enum Predicate {
None,
Expand Down Expand Up @@ -95,10 +99,6 @@ impl IpcSourceNode {
.map(|cols| columns_to_projection(&cols, &metadata.schema))
.transpose()?;

if predicate.is_some() && row_index.is_some() {
todo!();
}

let predicate = match (predicate, slice) {
(None, None) => Predicate::None,
(None, Some((offset, _))) if offset != 0 => todo!(),
Expand Down Expand Up @@ -167,141 +167,197 @@ impl ComputeNode for IpcSourceNode {

let senders = send[0].take().unwrap().parallel();

let mut rxs = Vec::with_capacity(senders.len());

let slf = &*self;
for _ in &senders {
let (mut tx, rx) = connector();
rxs.push(rx);
let needs_linearization =
matches!(slf.predicate, Predicate::Slice { .. }) || slf.row_index.is_some();

join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
loop {
if slf.is_finished.load(Ordering::Relaxed) {
break;
}

let (metadata, source) = &slf.opened_files;

let mut reader = FileReader::new(
Cursor::new(source),
metadata.clone(),
slf.projection.clone(),
None,
);
if needs_linearization {
let (mut linearizer, inserters) =
Linearizer::new(senders.len(), DEFAULT_LINEARIZER_BUFFER_SIZE);
for mut inserter in inserters {
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
let source_token = SourceToken::new();

loop {
if slf.is_finished.load(Ordering::Relaxed) {
break;
}

let seq = slf.seq.fetch_add(1, Ordering::Relaxed);
let (metadata, source) = &slf.opened_files;

if seq as usize >= metadata.blocks.len() {
break;
let mut reader = FileReader::new(
Cursor::new(source),
metadata.clone(),
slf.projection.clone(),
None,
);

loop {
if slf.is_finished.load(Ordering::Relaxed) {
break;
}

let seq = slf.seq.fetch_add(1, Ordering::Relaxed);

if seq as usize >= metadata.blocks.len() {
break;
}

reader.set_current_block(seq as usize);
let record_batch = reader.next().unwrap()?;

let schema = reader.schema();
assert_eq!(record_batch.arrays().len(), schema.len());

let arrays = record_batch.into_arrays();

let columns = arrays
.into_iter()
.zip(slf.projected_schema.iter())
.map(|(array, (name, field))| {
let field =
ArrowField::new(name.clone(), field.dtype.clone(), true);
Ok(Series::try_from((&field, vec![array]))?.into_column())
})
.collect::<PolarsResult<Vec<Column>>>()?;

let df = DataFrame::new(columns)?;

let morsel = Morsel::new(df, MorselSeq::new(seq), source_token.clone());
if inserter.insert(morsel).await.is_err() {
break;
};
}

reader.set_current_block(seq as usize);
let record_batch = reader.next().unwrap()?;
break;
}

PolarsResult::Ok(())
}));
}

let schema = reader.schema();
assert_eq!(record_batch.arrays().len(), schema.len());
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
let mut senders = senders;
let num_senders = senders.len();

let arrays = record_batch.into_arrays();
let source_token = SourceToken::new();

let mut num_collected = 0;
while let Some(morsel) = linearizer.get().await {
if slf.is_finished.load(Ordering::Relaxed) {
break;
}

let columns = arrays
.into_iter()
.zip(slf.projected_schema.iter())
.map(|(array, (name, field))| {
let field =
ArrowField::new(name.clone(), field.dtype.clone(), true);
Ok(Series::try_from((&field, vec![array]))?.into_column())
})
.collect::<PolarsResult<Vec<Column>>>()?;
let (mut df, seq, _, _) = morsel.into_inner();

let mut df = DataFrame::new(columns)?;
if let Some(ri) = &slf.row_index {
df = df.with_row_index(
ri.name.clone(),
Some(ri.offset + num_collected as IdxSize),
)?;
}

if let Predicate::Expr(predicate) = &slf.predicate {
let s = predicate.evaluate_io(&df)?;
num_collected += df.height();

match &slf.predicate {
Predicate::None => {},
Predicate::Slice { offset: _, length } => {
if num_collected > *length {
df = df.slice(0, df.height() + length - num_collected);
slf.is_finished.store(true, Ordering::Relaxed);
}
},
Predicate::Expr(expr) => {
let s = expr.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");

df = df.filter(mask)?;
}

if tx.send((seq, df)).await.is_err() {
break;
};
},
}

break;
let morsel = Morsel::new(df, seq, source_token.clone());
if senders[(seq.to_u64() as usize) % num_senders]
.send(morsel)
.await
.is_err()
{
break;
}
}

PolarsResult::Ok(())
Ok(())
}));
}
} else {
for mut send in senders {
join_handles.push(scope.spawn_task(TaskPriority::Low, async move {
let source_token = SourceToken::new();

loop {
if slf.is_finished.load(Ordering::Relaxed) {
break;
}

join_handles.push(scope.spawn_task(TaskPriority::High, async move {
let mut senders = senders;
let num_senders = senders.len();
let (metadata, source) = &slf.opened_files;

let source_token = SourceToken::new();
let mut reader = FileReader::new(
Cursor::new(source),
metadata.clone(),
slf.projection.clone(),
None,
);

let mut next = 0;
let mut buffered = Vec::new();
loop {
if slf.is_finished.load(Ordering::Relaxed) {
break;
}

let mut rxs = rxs;
let seq = slf.seq.fetch_add(1, Ordering::Relaxed);

let needs_linearization = matches!(slf.predicate, Predicate::Slice { .. }) || slf.row_index.is_some();
if seq as usize >= metadata.blocks.len() {
break;
}

let mut num_collected = 0;
loop {
if slf.is_finished.load(Ordering::Relaxed) {
break;
}
reader.set_current_block(seq as usize);
let record_batch = reader.next().unwrap()?;

let (Ok((idx, mut df)), _, _) =
futures::future::select_all(rxs.iter_mut().map(|rx| rx.recv())).await
else {
break;
};

if needs_linearization {
if idx != next {
buffered.push((idx, df));
let Some(next_idx) = buffered.iter().position(|(idx, _)| *idx == next)
else {
continue;
};

(_, df) = buffered.remove(next_idx);
}
let schema = reader.schema();
assert_eq!(record_batch.arrays().len(), schema.len());

next += 1;
let arrays = record_batch.into_arrays();

if let Some(ri) = &slf.row_index {
df = df.with_row_index(ri.name.clone(), Some(ri.offset))?;
}
let columns = arrays
.into_iter()
.zip(slf.projected_schema.iter())
.map(|(array, (name, field))| {
let field =
ArrowField::new(name.clone(), field.dtype.clone(), true);
Ok(Series::try_from((&field, vec![array]))?.into_column())
})
.collect::<PolarsResult<Vec<Column>>>()?;

let mut df = DataFrame::new(columns)?;

if let Predicate::Expr(predicate) = &slf.predicate {
let s = predicate.evaluate_io(&df)?;
let mask = s
.bool()
.expect("filter predicates was not of type boolean");

if let Predicate::Slice { offset: _, length } = &slf.predicate {
if num_collected + df.height() > *length {
df = df.slice(0, length - num_collected);
slf.is_finished.store(true, Ordering::Relaxed);
df = df.filter(mask)?;
}

let morsel = Morsel::new(df, MorselSeq::new(seq), source_token.clone());
if send.send(morsel).await.is_err() {
break;
};
}

num_collected += df.height();
break;
}
}

let morsel = Morsel::new(df, MorselSeq::new(idx), source_token.clone());
if senders[(idx as usize) % num_senders]
.send(morsel)
.await
.is_err()
{
break;
}
PolarsResult::Ok(())
}));
}

Ok(())
}));
}
}
}

0 comments on commit 0a14510

Please sign in to comment.