Skip to content

Commit

Permalink
feat: support metadata table "snapshots" (#822)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Dec 30, 2024
1 parent 044750f commit 328e18e
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 4 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,4 @@ volo-thrift = "0.10"
hive_metastore = "0.1"
tera = "1"
zstd = "0.13.2"
expect-test = "1"
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ zstd = { workspace = true }

[dev-dependencies]
ctor = { workspace = true }
expect-test = { workspace = true }
iceberg-catalog-memory = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod avro;
pub mod io;
pub mod spec;

pub mod metadata_scan;
pub mod scan;

pub mod expr;
Expand Down
256 changes: 256 additions & 0 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Metadata table api.
use std::sync::Arc;

use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMillisecondType};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema, TimeUnit};

use crate::spec::TableMetadata;
use crate::table::Table;
use crate::Result;

/// Metadata table is used to inspect a table's history, snapshots, and other metadata as a table.
///
/// References:
/// - <https://github.com/apache/iceberg/blob/ac865e334e143dfd9e33011d8cf710b46d91f1e5/core/src/main/java/org/apache/iceberg/MetadataTableType.java#L23-L39>
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
#[derive(Debug)]
pub struct MetadataTable(Table);

impl MetadataTable {
/// Creates a new metadata scan.
pub(super) fn new(table: Table) -> Self {
Self(table)
}

/// Get the snapshots table.
pub fn snapshots(&self) -> SnapshotsTable {
SnapshotsTable {
metadata_table: self,
}
}

fn metadata(&self) -> &TableMetadata {
self.0.metadata()
}
}

/// Snapshots table.
pub struct SnapshotsTable<'a> {
metadata_table: &'a MetadataTable,
}

impl<'a> SnapshotsTable<'a> {
/// Returns the schema of the snapshots table.
pub fn schema(&self) -> Schema {
Schema::new(vec![
Field::new(
"committed_at",
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
false,
),
Field::new("snapshot_id", DataType::Int64, false),
Field::new("parent_id", DataType::Int64, true),
Field::new("operation", DataType::Utf8, false),
Field::new("manifest_list", DataType::Utf8, false),
Field::new(
"summary",
DataType::Map(
Arc::new(Field::new(
"entries",
DataType::Struct(
vec![
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Utf8, true),
]
.into(),
),
false,
)),
false,
),
false,
),
])
}

/// Scans the snapshots table.
pub fn scan(&self) -> Result<RecordBatch> {
let mut committed_at =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut parent_id = PrimitiveBuilder::<Int64Type>::new();
let mut operation = StringBuilder::new();
let mut manifest_list = StringBuilder::new();
let mut summary = MapBuilder::new(None, StringBuilder::new(), StringBuilder::new());

for snapshot in self.metadata_table.metadata().snapshots() {
committed_at.append_value(snapshot.timestamp_ms());
snapshot_id.append_value(snapshot.snapshot_id());
parent_id.append_option(snapshot.parent_snapshot_id());
manifest_list.append_value(snapshot.manifest_list());
operation.append_value(snapshot.summary().operation.as_str());
for (key, value) in &snapshot.summary().additional_properties {
summary.keys().append_value(key);
summary.values().append_value(value);
}
summary.append(true)?;
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(committed_at.finish()),
Arc::new(snapshot_id.finish()),
Arc::new(parent_id.finish()),
Arc::new(operation.finish()),
Arc::new(manifest_list.finish()),
Arc::new(summary.finish()),
])?)
}
}

#[cfg(test)]
mod tests {
use expect_test::{expect, Expect};
use itertools::Itertools;

use super::*;
use crate::scan::tests::TableTestFixture;

/// Snapshot testing to check the resulting record batch.
///
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
/// Check the doc of [`expect_test`] for more details.
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
fn check_record_batch(
record_batch: RecordBatch,
expected_schema: Expect,
expected_data: Expect,
ignore_check_columns: &[&str],
sort_column: Option<&str>,
) {
let mut columns = record_batch.columns().to_vec();
if let Some(sort_column) = sort_column {
let column = record_batch.column_by_name(sort_column).unwrap();
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
columns = columns
.iter()
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
.collect_vec();
}

expected_schema.assert_eq(&format!(
"{}",
record_batch.schema().fields().iter().format(",\n")
));
expected_data.assert_eq(&format!(
"{}",
record_batch
.schema()
.fields()
.iter()
.zip_eq(columns)
.map(|(field, column)| {
if ignore_check_columns.contains(&field.name().as_str()) {
format!("{}: (skipped)", field.name())
} else {
format!("{}: {:?}", field.name(), column)
}
})
.format(",\n")
));
}

#[test]
fn test_snapshots_table() {
let table = TableTestFixture::new().table;
let record_batch = table.metadata_table().snapshots().scan().unwrap();
check_record_batch(
record_batch,
expect![[r#"
Field { name: "committed_at", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "operation", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "manifest_list", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "summary", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "keys", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
expect![[r#"
committed_at: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
[
2018-01-04T21:22:35.770+00:00,
2019-04-12T20:29:15.770+00:00,
],
snapshot_id: PrimitiveArray<Int64>
[
3051729675574597004,
3055729675574597004,
],
parent_id: PrimitiveArray<Int64>
[
null,
3051729675574597004,
],
operation: StringArray
[
"append",
"append",
],
manifest_list: (skipped),
summary: MapArray
[
StructArray
-- validity:
[
]
[
-- child 0: "keys" (Utf8)
StringArray
[
]
-- child 1: "values" (Utf8)
StringArray
[
]
],
StructArray
-- validity:
[
]
[
-- child 0: "keys" (Utf8)
StringArray
[
]
-- child 1: "values" (Utf8)
StringArray
[
]
],
]"#]],
&["manifest_list"],
Some("committed_at"),
);
}
}
9 changes: 5 additions & 4 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ impl FileScanTask {
}

#[cfg(test)]
mod tests {
pub mod tests {
use std::collections::HashMap;
use std::fs;
use std::fs::File;
Expand Down Expand Up @@ -990,13 +990,14 @@ mod tests {
use crate::table::Table;
use crate::TableIdent;

struct TableTestFixture {
pub struct TableTestFixture {
table_location: String,
table: Table,
pub table: Table,
}

impl TableTestFixture {
fn new() -> Self {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
Expand Down
12 changes: 12 additions & 0 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ pub enum Operation {
Delete,
}

impl Operation {
/// Returns the string representation (lowercase) of the operation.
pub fn as_str(&self) -> &str {
match self {
Operation::Append => "append",
Operation::Replace => "replace",
Operation::Overwrite => "overwrite",
Operation::Delete => "delete",
}
}
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
/// Summarises the changes in the snapshot.
pub struct Summary {
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;
use crate::arrow::ArrowReaderBuilder;
use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::metadata_scan::MetadataTable;
use crate::scan::TableScanBuilder;
use crate::spec::{TableMetadata, TableMetadataRef};
use crate::{Error, ErrorKind, Result, TableIdent};
Expand Down Expand Up @@ -200,6 +201,12 @@ impl Table {
TableScanBuilder::new(self)
}

/// Creates a metadata table which provides table-like APIs for inspecting metadata.
/// See [`MetadataTable`] for more details.
pub fn metadata_table(self) -> MetadataTable {
MetadataTable::new(self)
}

/// Returns the flag indicating whether the `Table` is readonly or not
pub fn readonly(&self) -> bool {
self.readonly
Expand Down

0 comments on commit 328e18e

Please sign in to comment.