diff --git a/Cargo.lock b/Cargo.lock index 8a456f274..42388f85c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2135,6 +2135,12 @@ dependencies = [ "syn 2.0.92", ] +[[package]] +name = "dissimilar" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f8e79d1fbf76bdfbde321e902714bf6c49df88a7dda6fc682fc2979226962d" + [[package]] name = "dlv-list" version = "0.5.2" @@ -2236,6 +2242,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "expect-test" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63af43ff4431e848fb47472a920f14fa71c24de13255a5692e93d4e90302acb0" +dependencies = [ + "dissimilar", + "once_cell", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -2868,6 +2884,7 @@ dependencies = [ "chrono", "ctor", "derive_builder", + "expect-test", "fnv", "futures", "iceberg-catalog-memory", diff --git a/Cargo.toml b/Cargo.toml index b796308be..5b1dca422 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,3 +101,4 @@ volo-thrift = "0.10" hive_metastore = "0.1" tera = "1" zstd = "0.13.2" +expect-test = "1" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index f84e7ab67..7f323722f 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -86,6 +86,7 @@ zstd = { workspace = true } [dev-dependencies] ctor = { workspace = true } +expect-test = { workspace = true } iceberg-catalog-memory = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index eaecfea60..1946f35f3 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -73,6 +73,7 @@ mod avro; pub mod io; pub mod spec; +pub mod metadata_scan; pub mod scan; pub mod expr; diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_scan.rs new file mode 100644 index 000000000..942d7605c --- /dev/null +++ b/crates/iceberg/src/metadata_scan.rs @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata table api. + +use std::sync::Arc; + +use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder}; +use arrow_array::types::{Int64Type, TimestampMillisecondType}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema, TimeUnit}; + +use crate::spec::TableMetadata; +use crate::table::Table; +use crate::Result; + +/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table. +/// +/// References: +/// - +/// - +/// - +#[derive(Debug)] +pub struct MetadataTable(Table); + +impl MetadataTable { + /// Creates a new metadata scan. + pub(super) fn new(table: Table) -> Self { + Self(table) + } + + /// Get the snapshots table. + pub fn snapshots(&self) -> SnapshotsTable { + SnapshotsTable { + metadata_table: self, + } + } + + fn metadata(&self) -> &TableMetadata { + self.0.metadata() + } +} + +/// Snapshots table. +pub struct SnapshotsTable<'a> { + metadata_table: &'a MetadataTable, +} + +impl<'a> SnapshotsTable<'a> { + /// Returns the schema of the snapshots table. + pub fn schema(&self) -> Schema { + Schema::new(vec![ + Field::new( + "committed_at", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("snapshot_id", DataType::Int64, false), + Field::new("parent_id", DataType::Int64, true), + Field::new("operation", DataType::Utf8, false), + Field::new("manifest_list", DataType::Utf8, false), + Field::new( + "summary", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct( + vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Utf8, true), + ] + .into(), + ), + false, + )), + false, + ), + false, + ), + ]) + } + + /// Scans the snapshots table. + pub fn scan(&self) -> Result { + let mut committed_at = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut snapshot_id = PrimitiveBuilder::::new(); + let mut parent_id = PrimitiveBuilder::::new(); + let mut operation = StringBuilder::new(); + let mut manifest_list = StringBuilder::new(); + let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new()); + + for snapshot in self.metadata_table.metadata().snapshots() { + committed_at.append_value(snapshot.timestamp_ms()); + snapshot_id.append_value(snapshot.snapshot_id()); + parent_id.append_option(snapshot.parent_snapshot_id()); + manifest_list.append_value(snapshot.manifest_list()); + operation.append_value(snapshot.summary().operation.as_str()); + for (key, value) in &snapshot.summary().additional_properties { + summary.keys().append_value(key); + summary.values().append_value(value); + } + summary.append(true)?; + } + + Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(committed_at.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(parent_id.finish()), + Arc::new(operation.finish()), + Arc::new(manifest_list.finish()), + Arc::new(summary.finish()), + ])?) + } +} + +#[cfg(test)] +mod tests { + use expect_test::{expect, Expect}; + use itertools::Itertools; + + use super::*; + use crate::scan::tests::TableTestFixture; + + /// Snapshot testing to check the resulting record batch. + /// + /// - `expected_schema/data`: put `expect![[""]]` as a placeholder, + /// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result, + /// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)). + /// Check the doc of [`expect_test`] for more details. + /// - `ignore_check_columns`: Some columns are not stable, so we can skip them. + /// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column. + fn check_record_batch( + record_batch: RecordBatch, + expected_schema: Expect, + expected_data: Expect, + ignore_check_columns: &[&str], + sort_column: Option<&str>, + ) { + let mut columns = record_batch.columns().to_vec(); + if let Some(sort_column) = sort_column { + let column = record_batch.column_by_name(sort_column).unwrap(); + let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap(); + columns = columns + .iter() + .map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap()) + .collect_vec(); + } + + expected_schema.assert_eq(&format!( + "{}", + record_batch.schema().fields().iter().format(",\n") + )); + expected_data.assert_eq(&format!( + "{}", + record_batch + .schema() + .fields() + .iter() + .zip_eq(columns) + .map(|(field, column)| { + if ignore_check_columns.contains(&field.name().as_str()) { + format!("{}: (skipped)", field.name()) + } else { + format!("{}: {:?}", field.name(), column) + } + }) + .format(",\n") + )); + } + + #[test] + fn test_snapshots_table() { + let table = TableTestFixture::new().table; + let record_batch = table.metadata_table().snapshots().scan().unwrap(); + check_record_batch( + record_batch, + expect![[r#" + Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + committed_at: PrimitiveArray + [ + 2018-01-04T21:22:35.770+00:00, + 2019-04-12T20:29:15.770+00:00, + ], + snapshot_id: PrimitiveArray + [ + 3051729675574597004, + 3055729675574597004, + ], + parent_id: PrimitiveArray + [ + null, + 3051729675574597004, + ], + operation: StringArray + [ + "append", + "append", + ], + manifest_list: (skipped), + summary: MapArray + [ + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + StructArray + -- validity: + [ + ] + [ + -- child 0: "keys" (Utf8) + StringArray + [ + ] + -- child 1: "values" (Utf8) + StringArray + [ + ] + ], + ]"#]], + &["manifest_list"], + Some("committed_at"), + ); + } +} diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 7a100b346..cb3e5d8c8 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -961,7 +961,7 @@ impl FileScanTask { } #[cfg(test)] -mod tests { +pub mod tests { use std::collections::HashMap; use std::fs; use std::fs::File; @@ -990,13 +990,14 @@ mod tests { use crate::table::Table; use crate::TableIdent; - struct TableTestFixture { + pub struct TableTestFixture { table_location: String, - table: Table, + pub table: Table, } impl TableTestFixture { - fn new() -> Self { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().join("table1"); let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 81fd6eae6..f24a3c26b 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -52,6 +52,18 @@ pub enum Operation { Delete, } +impl Operation { + /// Returns the string representation (lowercase) of the operation. + pub fn as_str(&self) -> &str { + match self { + Operation::Append => "append", + Operation::Replace => "replace", + Operation::Overwrite => "overwrite", + Operation::Delete => "delete", + } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] /// Summarises the changes in the snapshot. pub struct Summary { diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 406f9dd65..fa5304855 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; +use crate::metadata_scan::MetadataTable; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -200,6 +201,12 @@ impl Table { TableScanBuilder::new(self) } + /// Creates a metadata table which provides table-like APIs for inspecting metadata. + /// See [`MetadataTable`] for more details. + pub fn metadata_table(self) -> MetadataTable { + MetadataTable::new(self) + } + /// Returns the flag indicating whether the `Table` is readonly or not pub fn readonly(&self) -> bool { self.readonly