-
Notifications
You must be signed in to change notification settings - Fork 216
/
Copy pathwindow.rs
34 lines (30 loc) · 1.18 KB
/
window.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.
use super::*;
use crate::array::DataChunkBuilder;
/// The executor of window functions.
pub struct WindowExecutor {
/// A list of over window functions.
///
/// e.g. `(list (over (lag #0) list list))`
pub exprs: RecExpr,
/// The types of window function columns.
pub types: Vec<DataType>,
}
impl WindowExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self, child: BoxedExecutor) {
let mut states = Evaluator::new(&self.exprs).init_agg_states::<Vec<_>>();
#[for_await]
for chunk in child {
let chunk = chunk?;
let mut builder = DataChunkBuilder::new(&self.types, chunk.cardinality() + 1);
for i in 0..chunk.cardinality() {
Evaluator::new(&self.exprs).agg_list_append(&mut states, chunk.row(i).values());
let results = Evaluator::new(&self.exprs).agg_list_get_result(&states);
_ = builder.push_row(results);
}
let window_chunk = builder.take().unwrap();
yield chunk.row_concat(window_chunk);
}
}
}