-
Notifications
You must be signed in to change notification settings - Fork 216
/
Copy pathcopy_to_file.rs
113 lines (100 loc) · 3.46 KB
/
copy_to_file.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright 2024 RisingLight Project Authors. Licensed under Apache-2.0.
use std::fs::File;
use std::path::PathBuf;
use tokio::sync::mpsc;
use super::*;
use crate::binder::copy::{ExtSource, FileFormat};
/// The executor of saving data to file.
pub struct CopyToFileExecutor {
pub source: ExtSource,
}
impl CopyToFileExecutor {
#[try_stream(boxed, ok = DataChunk, error = ExecutorError)]
pub async fn execute(self, child: BoxedExecutor) {
let (sender, recver) = mpsc::channel(1);
// # Cancellation
// When this stream is dropped, the `sender` is dropped, the `recver` will return
// `None` in the spawned task, then the task will finish.
let writer = tokio::task::spawn_blocking(move || {
Self::write_file_blocking(self.source.path, self.source.format, recver)
});
#[for_await]
for batch in child {
let res = sender.send(batch?).await;
if res.is_err() {
// send error means the background IO task returns error.
break;
}
}
drop(sender);
let rows = writer.await.unwrap()?;
yield DataChunk::single(rows as _);
}
fn write_file_blocking(
path: PathBuf,
format: FileFormat,
mut recver: mpsc::Receiver<DataChunk>,
) -> Result<usize> {
let file = File::create(path)?;
let mut writer = match format {
FileFormat::Csv {
delimiter,
quote,
escape,
header,
} => csv::WriterBuilder::new()
.delimiter(delimiter as u8)
.quote(quote as u8)
.escape(escape.unwrap_or(quote) as u8)
.has_headers(header)
.from_writer(file),
};
let mut rows = 0;
while let Some(chunk) = recver.blocking_recv() {
for i in 0..chunk.cardinality() {
// TODO(wrj): avoid dynamic memory allocation (String)
let row = chunk.arrays().iter().map(|a| a.get_to_string(i));
writer.write_record(row)?;
}
writer.flush()?;
rows += chunk.cardinality();
}
// the task maybe completed or cancelled.
// if cancelled, just leave the file as it is.
Ok(rows)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::array::ArrayImpl;
#[tokio::test]
async fn write_csv() {
let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
let executor = CopyToFileExecutor {
source: ExtSource {
path: file.path().into(),
format: FileFormat::Csv {
delimiter: ',',
quote: '"',
escape: None,
header: false,
},
},
};
let child = async_stream::try_stream! {
yield [
ArrayImpl::new_int32([1, 2].into_iter().collect()),
ArrayImpl::new_float64([1.5, 2.5].into_iter().collect()),
ArrayImpl::new_string(["one", "two"].iter().map(Some).collect()),
]
.into_iter()
.collect();
}
.boxed();
executor.execute(child).next().await.unwrap().unwrap();
let actual = std::fs::read_to_string(file.path()).unwrap();
let expected = "1,1.5,one\n2,2.5,two\n";
assert_eq!(actual, expected);
}
}