-
Notifications
You must be signed in to change notification settings - Fork 216
/
Copy pathtable_scan.rs
55 lines (46 loc) · 1.55 KB
/
table_scan.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.
use std::sync::Arc;
use super::*;
use crate::array::DataChunk;
use crate::catalog::{ColumnRefId, TableRefId};
use crate::storage::{
KeyRange, ScanOptions, Storage, StorageColumnRef, Table, Transaction, TxnIterator,
};
/// The executor of table scan operation.
pub struct TableScanExecutor<S: Storage> {
pub table_id: TableRefId,
pub columns: Vec<ColumnRefId>,
pub filter: Option<KeyRange>,
pub storage: Arc<S>,
}
impl<S: Storage> TableScanExecutor<S> {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self) {
let table = self.storage.get_table(self.table_id)?;
let mut col_idx = self
.columns
.iter()
.map(|x| match x.column_id {
u32::MAX => StorageColumnRef::RowHandler,
id => StorageColumnRef::Idx(id), // convert column id -> storage column idx
})
.collect_vec();
// TODO: append row handler?
if self.columns.is_empty() {
col_idx.push(StorageColumnRef::RowHandler);
}
let txn = table.read().await?;
let mut it = txn
.scan(
&col_idx,
ScanOptions::default().with_filter_opt(self.filter),
)
.await?;
while let Some(mut x) = it.next_batch(None).await? {
if self.columns.is_empty() {
x = DataChunk::no_column(x.cardinality());
}
yield x;
}
}
}