Skip to content

Commit

Permalink
add total rows to output (#164)
Browse files Browse the repository at this point in the history
* add total rows to output

* format
  • Loading branch information
sslivkoff authored Jan 3, 2024
1 parent 806d7fc commit c5ed901
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
14 changes: 10 additions & 4 deletions crates/freeze/src/freeze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,13 @@ async fn freeze_partitions(
// aggregate results
let mut completed = Vec::new();
let mut errored = Vec::new();
let mut n_rows = 0;
while let Some(result) = futures.next().await {
match result {
Ok((partition, Ok(()))) => completed.push(partition),
Ok((partition, Ok(chunk_n_rows))) => {
n_rows += chunk_n_rows;
completed.push(partition)
}
Ok((partition, Err(e))) => errored.push((Some(partition), e)),
Err(_e) => errored.push((None, err("error joining chunks"))),
}
Expand All @@ -161,10 +165,10 @@ async fn freeze_partitions(
bar.finish_and_clear();
}

FreezeSummary { completed, errored, skipped }
FreezeSummary { completed, errored, skipped, n_rows }
}

async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError> {
async fn freeze_partition(payload: PartitionPayload) -> Result<u64, CollectError> {
let (partition, datatype, paths, query, source, sink, env, semaphore) = payload;

// acquire chunk semaphore
Expand All @@ -177,7 +181,9 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError>
let dfs = collect_partition(datatype, partition, query, source).await?;

// write dataframes to disk
let mut n_rows = 0;
for (datatype, mut df) in dfs {
n_rows += df.height() as u64;
let path = paths.get(&datatype).ok_or_else(|| {
CollectError::CollectError("could not get path for datatype".to_string())
})?;
Expand All @@ -190,5 +196,5 @@ async fn freeze_partition(payload: PartitionPayload) -> Result<(), CollectError>
bar.inc(1);
}

Ok(())
Ok(n_rows)
}
3 changes: 3 additions & 0 deletions crates/freeze/src/types/summaries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct FreezeSummary {
pub skipped: Vec<Partition>,
/// partitions errored
pub errored: Vec<(Option<Partition>, CollectError)>,
/// rows written
pub n_rows: u64,
}

/// print all datasets
Expand Down Expand Up @@ -498,6 +500,7 @@ pub(crate) fn print_cryo_conclusion(
total_time,
query.datatypes.len() as u64,
);
print_bullet_indent("rows written", freeze_summary.n_rows.separate_with_commas(), 0);
}

macro_rules! print_dim_speed {
Expand Down

0 comments on commit c5ed901

Please sign in to comment.