From 5cd51a1a2edc11da6f19fe79a7682f00a94b03b9 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 1 Sep 2023 20:42:55 +0800 Subject: [PATCH] feat: Define catalog api (#171) --- Cargo.toml | 18 ++-- icelake/Cargo.toml | 7 +- icelake/src/catalog/mod.rs | 195 ++++++++++++++++++++++++++++++++++++ icelake/src/catalog/rest.rs | 82 +++++++++++++++ icelake/src/lib.rs | 1 + icelake/src/table.rs | 11 ++ 6 files changed, 301 insertions(+), 13 deletions(-) create mode 100644 icelake/src/catalog/mod.rs create mode 100644 icelake/src/catalog/rest.rs diff --git a/Cargo.toml b/Cargo.toml index eb80d8f..cf6d7f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,7 @@ edition = "2021" license = "Apache-2.0" [workspace] -members = [ - "icelake", - "rest_api" -] +members = ["icelake", "rest_api"] [workspace.dependencies] @@ -16,11 +13,11 @@ async-trait = "0.1" apache-avro = { version = "0.15", features = ["derive"] } arrow-array = { version = ">=46" } arrow-schema = { version = ">=46" } -arrow-select = { version = ">=46"} -arrow-row = { version = ">=46"} -arrow-buffer = { version = ">=46"} -arrow-arith= { version = ">=46"} -arrow-csv= { version = ">=46"} +arrow-select = { version = ">=46" } +arrow-row = { version = ">=46" } +arrow-buffer = { version = ">=46" } +arrow-arith = { version = ">=46" } +arrow-csv = { version = ">=46" } bytes = "1" opendal = ">=0.37, <0.40" uuid = { version = "1", features = ["v4"] } @@ -44,4 +41,5 @@ ordered-float = "3.7.0" confique = "0.2" libtest-mimic = "0.6" futures = { version = "0.3", features = ["executor"] } -testcontainers = {git = "https://github.com/liurenjie1024/testcontainers-rs.git", rev = "24fd08c05aa72ca7542198056c8c592a1899fd39"} +testcontainers = { git = "https://github.com/liurenjie1024/testcontainers-rs.git", rev = "24fd08c05aa72ca7542198056c8c592a1899fd39" } +iceberg-rest-api = { path = "./rest_api" } diff --git a/icelake/Cargo.toml b/icelake/Cargo.toml index e950342..c9760fe 100644 --- a/icelake/Cargo.toml +++ b/icelake/Cargo.toml @@ -15,7 +15,7 @@ arrow-array = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-row = { workspace = true } -arrow-arith= { workspace = true } +arrow-arith = { workspace = true } arrow-buffer = { workspace = true } bytes = { workspace = true } futures = { workspace = true } @@ -39,6 +39,7 @@ bitvec = "1.0.1" serde_bytes = "0.11.12" toml = "0.7.6" csv = "1.2.2" +iceberg-rest-api = { workspace = true } [dev-dependencies] @@ -46,8 +47,8 @@ tempfile = { workspace = true } testcontainers = { workspace = true } confique = { workspace = true } csv = { workspace = true } -env_logger = {workspace = true} -arrow-csv ={ workspace = true } +env_logger = { workspace = true } +arrow-csv = { workspace = true } [[example]] name = "read_iceberg_table" diff --git a/icelake/src/catalog/mod.rs b/icelake/src/catalog/mod.rs new file mode 100644 index 0000000..ad9624a --- /dev/null +++ b/icelake/src/catalog/mod.rs @@ -0,0 +1,195 @@ +//! This module defines catalog api for icelake. + +use std::collections::{HashMap, HashSet}; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::table::{Namespace, TableIdentifier}; +use crate::types::{ + PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReferenceType, SortOrder, +}; +use crate::Table; + +mod rest; +pub use rest::*; + +/// Catalog definition. +#[async_trait] +pub trait Catalog { + /// Return catalog's name. + fn name(&self) -> &str; + + /// List tables under namespace. + async fn list_tables(&self, ns: &Namespace) -> Result>; + + /// Creates a table. + async fn create_table( + &self, + table_name: &TableIdentifier, + schema: &Schema, + spec: &PartitionSpec, + location: &str, + props: HashMap, + ) -> Result; + + /// Check table exists. + async fn table_exists(&self, table_name: &TableIdentifier) -> Result; + + /// Drop table. + async fn drop_table(&self, table_name: &TableIdentifier, purge: bool) -> Result<()>; + + /// Rename table. + async fn rename_table(&self, from: &TableIdentifier, to: &TableIdentifier) -> Result<()>; + + /// Load table. + async fn load_table(&self, table_name: &TableIdentifier) -> Result
; + + /// Invalidate table. + async fn invalidate_table(&self, table_name: &TableIdentifier) -> Result<()>; + + /// Register a table using metadata file location. + async fn register_table( + &self, + table_name: &TableIdentifier, + metadata_file_location: &str, + ) -> Result
; + + /// Update table. + async fn update_table(&self, udpate_table: &UpdateTable) -> Result
; +} + +/// Update table requirments +pub enum UpdateRquirement { + /// Requirest table exists. + AssertTableDoesNotExist, + /// Requirest current table's uuid . + AssertTableUUID(Uuid), + /// Requirest current table branch's snapshot id. + AssertRefSnapshotID { + /// Branch name + name: String, + /// Snapshot id + snapshot_id: i64, + }, + /// Requirest current table's last assigned field id. + AssertLastAssignedFieldId { + /// Last assigned field id. + last_assigned_field_id: i32, + }, + /// Requirest current table's schema id. + AssertCurrentSchemaID { + /// Schema id + schema_id: i32, + }, + /// Requirest current table's last assigned partition id. + AssertLastAssignedPartitionId { + /// Partition id. + last_assigned_partition_id: i32, + }, + /// Requirest current table's default spec assigned partition id. + AssertDefaultSpecID { + /// Spec id + spec_id: i32, + }, + /// Requirest current table's default spec sort order id. + AssertDefaultSortOrderID { + /// Sort order id. + sort_order_id: i32, + }, +} + +/// Metadata updates. +pub enum MetadataUpdate { + /// Assign uuid. + AssignUuid(Uuid), + /// Upgrade format version. + UpgradeFormatVersion(i32), + /// Add schema + AddSchema { + /// New schema + schema: Schema, + /// Last column id + last_column_id: i32, + }, + /// Set current schema id. + SetCurrentSchema { + /// Schema id. + schema_id: i32, + }, + /// Add partition spec + AddPartitionSpec { + /// Spec id + spec_id: i32, + /// Partiton fields + fields: Vec, + }, + /// Set default partiton spec. + SetDefaultPartitonSpec { + /// Partiton spec id + spec_id: i32, + }, + /// Add sort order. + AddSortOrder { + /// Sort order + sort_order: SortOrder, + }, + /// Set defaut sort order + SetDefaultSortOrder { + /// Sort order id + sort_order_id: i32, + }, + /// Add snapshot + AddSnapshot { + /// Snapshot + snapshot: Snapshot, + }, + /// Remove snapshot + RemoveSnapshot { + /// Snapshot id + snapshot_id: i64, + }, + /// Remove snapshot ref + RemoveSnapshotRef { + /// Ref name. + ref_name: String, + }, + /// Update snapshot reference. + SetSnapshotRef { + /// Branch name + ref_name: String, + /// Snapshot shot id. + snapshot_id: Option, + /// Type + typ: SnapshotReferenceType, + /// Number of snapshots to keep. + min_snapshots_to_keep: Option, + /// Max snapshot ages + max_snapshot_ages: Option, + /// Max ref ages + max_ref_ages: Option, + }, + /// Update table properties. + SetProperties { + /// Table properties. + props: HashMap, + }, + /// Remove table properties. + RemoveProperties { + /// Keys to remove. + removed: HashSet, + }, + /// Set table location + SetLocation { + /// Table Location + location: String, + }, +} + +/// Update table request. +pub struct UpdateTable { + table_name: TableIdentifier, + requirements: Vec, + updates: Vec, +} diff --git a/icelake/src/catalog/rest.rs b/icelake/src/catalog/rest.rs new file mode 100644 index 0000000..32384e0 --- /dev/null +++ b/icelake/src/catalog/rest.rs @@ -0,0 +1,82 @@ +//! Rest catalog implementation. +//! + +use std::collections::HashMap; + +use async_trait::async_trait; + +use crate::{ + table::{Namespace, TableIdentifier}, + types::{PartitionSpec, Schema}, + Table, +}; + +use super::{Catalog, UpdateTable}; +use crate::error::Result; + +/// Rest catalog implementation +pub struct RestCatalog {} + +#[async_trait] +impl Catalog for RestCatalog { + /// Return catalog's name. + fn name(&self) -> &str { + todo!() + } + + /// List tables under namespace. + async fn list_tables(&self, _ns: &Namespace) -> Result> { + todo!() + } + + /// Creates a table. + async fn create_table( + &self, + _table_name: &TableIdentifier, + _schema: &Schema, + _spec: &PartitionSpec, + _location: &str, + _props: HashMap, + ) -> Result
{ + todo!() + } + + /// Check table exists. + async fn table_exists(&self, _table_name: &TableIdentifier) -> Result { + todo!() + } + + /// Drop table. + async fn drop_table(&self, _table_name: &TableIdentifier, _purge: bool) -> Result<()> { + todo!() + } + + /// Rename table. + async fn rename_table(&self, _from: &TableIdentifier, _to: &TableIdentifier) -> Result<()> { + todo!() + } + + /// Load table. + async fn load_table(&self, _table_name: &TableIdentifier) -> Result
{ + todo!() + } + + /// Invalidate table. + async fn invalidate_table(&self, _table_name: &TableIdentifier) -> Result<()> { + todo!() + } + + /// Register a table using metadata file location. + async fn register_table( + &self, + _table_name: &TableIdentifier, + _metadata_file_location: &str, + ) -> Result
{ + todo!() + } + + /// Update table. + async fn update_table(&self, _udpate_table: &UpdateTable) -> Result
{ + todo!() + } +} diff --git a/icelake/src/lib.rs b/icelake/src/lib.rs index cbb591e..7d71275 100644 --- a/icelake/src/lib.rs +++ b/icelake/src/lib.rs @@ -12,6 +12,7 @@ pub use error::Error; pub use error::ErrorKind; pub use error::Result; +pub mod catalog; pub mod config; pub mod io; pub mod transaction; diff --git a/icelake/src/table.rs b/icelake/src/table.rs index 86572ee..226040d 100644 --- a/icelake/src/table.rs +++ b/icelake/src/table.rs @@ -21,6 +21,17 @@ const METADATA_FILE_EXTENSION: &str = ".metadata.json"; const VERSION_HINT_FILENAME: &str = "version-hint.text"; const VERSIONED_TABLE_METADATA_FILE_PATTERN: &str = r"v([0-9]+).metadata.json"; +/// Namespace of tables +pub struct Namespace { + levels: Vec, +} + +/// Full qualified name of table. +pub struct TableIdentifier { + namespace: Namespace, + name: String, +} + /// Table is the main entry point for the IceLake. pub struct Table { op: Operator,