Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
feat: Define catalog api (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Sep 1, 2023
1 parent 848e8b7 commit 5cd51a1
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 13 deletions.
18 changes: 8 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ edition = "2021"
license = "Apache-2.0"

[workspace]
members = [
"icelake",
"rest_api"
]
members = ["icelake", "rest_api"]


[workspace.dependencies]
Expand All @@ -16,11 +13,11 @@ async-trait = "0.1"
apache-avro = { version = "0.15", features = ["derive"] }
arrow-array = { version = ">=46" }
arrow-schema = { version = ">=46" }
arrow-select = { version = ">=46"}
arrow-row = { version = ">=46"}
arrow-buffer = { version = ">=46"}
arrow-arith= { version = ">=46"}
arrow-csv= { version = ">=46"}
arrow-select = { version = ">=46" }
arrow-row = { version = ">=46" }
arrow-buffer = { version = ">=46" }
arrow-arith = { version = ">=46" }
arrow-csv = { version = ">=46" }
bytes = "1"
opendal = ">=0.37, <0.40"
uuid = { version = "1", features = ["v4"] }
Expand All @@ -44,4 +41,5 @@ ordered-float = "3.7.0"
confique = "0.2"
libtest-mimic = "0.6"
futures = { version = "0.3", features = ["executor"] }
testcontainers = {git = "https://github.com/liurenjie1024/testcontainers-rs.git", rev = "24fd08c05aa72ca7542198056c8c592a1899fd39"}
testcontainers = { git = "https://github.com/liurenjie1024/testcontainers-rs.git", rev = "24fd08c05aa72ca7542198056c8c592a1899fd39" }
iceberg-rest-api = { path = "./rest_api" }
7 changes: 4 additions & 3 deletions icelake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-row = { workspace = true }
arrow-arith= { workspace = true }
arrow-arith = { workspace = true }
arrow-buffer = { workspace = true }
bytes = { workspace = true }
futures = { workspace = true }
Expand All @@ -39,15 +39,16 @@ bitvec = "1.0.1"
serde_bytes = "0.11.12"
toml = "0.7.6"
csv = "1.2.2"
iceberg-rest-api = { workspace = true }


[dev-dependencies]
tempfile = { workspace = true }
testcontainers = { workspace = true }
confique = { workspace = true }
csv = { workspace = true }
env_logger = {workspace = true}
arrow-csv ={ workspace = true }
env_logger = { workspace = true }
arrow-csv = { workspace = true }

[[example]]
name = "read_iceberg_table"
Expand Down
195 changes: 195 additions & 0 deletions icelake/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! This module defines catalog api for icelake.

use std::collections::{HashMap, HashSet};

use async_trait::async_trait;
use uuid::Uuid;

use crate::error::Result;
use crate::table::{Namespace, TableIdentifier};
use crate::types::{
PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReferenceType, SortOrder,
};
use crate::Table;

mod rest;
pub use rest::*;

/// Catalog definition.
#[async_trait]
pub trait Catalog {
/// Return catalog's name.
fn name(&self) -> &str;

/// List tables under namespace.
async fn list_tables(&self, ns: &Namespace) -> Result<Vec<TableIdentifier>>;

/// Creates a table.
async fn create_table(
&self,
table_name: &TableIdentifier,
schema: &Schema,
spec: &PartitionSpec,
location: &str,
props: HashMap<String, String>,
) -> Result<Table>;

/// Check table exists.
async fn table_exists(&self, table_name: &TableIdentifier) -> Result<bool>;

/// Drop table.
async fn drop_table(&self, table_name: &TableIdentifier, purge: bool) -> Result<()>;

/// Rename table.
async fn rename_table(&self, from: &TableIdentifier, to: &TableIdentifier) -> Result<()>;

/// Load table.
async fn load_table(&self, table_name: &TableIdentifier) -> Result<Table>;

/// Invalidate table.
async fn invalidate_table(&self, table_name: &TableIdentifier) -> Result<()>;

/// Register a table using metadata file location.
async fn register_table(
&self,
table_name: &TableIdentifier,
metadata_file_location: &str,
) -> Result<Table>;

/// Update table.
async fn update_table(&self, udpate_table: &UpdateTable) -> Result<Table>;
}

/// Update table requirments
pub enum UpdateRquirement {
/// Requirest table exists.
AssertTableDoesNotExist,
/// Requirest current table's uuid .
AssertTableUUID(Uuid),
/// Requirest current table branch's snapshot id.
AssertRefSnapshotID {
/// Branch name
name: String,
/// Snapshot id
snapshot_id: i64,
},
/// Requirest current table's last assigned field id.
AssertLastAssignedFieldId {
/// Last assigned field id.
last_assigned_field_id: i32,
},
/// Requirest current table's schema id.
AssertCurrentSchemaID {
/// Schema id
schema_id: i32,
},
/// Requirest current table's last assigned partition id.
AssertLastAssignedPartitionId {
/// Partition id.
last_assigned_partition_id: i32,
},
/// Requirest current table's default spec assigned partition id.
AssertDefaultSpecID {
/// Spec id
spec_id: i32,
},
/// Requirest current table's default spec sort order id.
AssertDefaultSortOrderID {
/// Sort order id.
sort_order_id: i32,
},
}

/// Metadata updates.
pub enum MetadataUpdate {
/// Assign uuid.
AssignUuid(Uuid),
/// Upgrade format version.
UpgradeFormatVersion(i32),
/// Add schema
AddSchema {
/// New schema
schema: Schema,
/// Last column id
last_column_id: i32,
},
/// Set current schema id.
SetCurrentSchema {
/// Schema id.
schema_id: i32,
},
/// Add partition spec
AddPartitionSpec {
/// Spec id
spec_id: i32,
/// Partiton fields
fields: Vec<PartitionField>,
},
/// Set default partiton spec.
SetDefaultPartitonSpec {
/// Partiton spec id
spec_id: i32,
},
/// Add sort order.
AddSortOrder {
/// Sort order
sort_order: SortOrder,
},
/// Set defaut sort order
SetDefaultSortOrder {
/// Sort order id
sort_order_id: i32,
},
/// Add snapshot
AddSnapshot {
/// Snapshot
snapshot: Snapshot,
},
/// Remove snapshot
RemoveSnapshot {
/// Snapshot id
snapshot_id: i64,
},
/// Remove snapshot ref
RemoveSnapshotRef {
/// Ref name.
ref_name: String,
},
/// Update snapshot reference.
SetSnapshotRef {
/// Branch name
ref_name: String,
/// Snapshot shot id.
snapshot_id: Option<i64>,
/// Type
typ: SnapshotReferenceType,
/// Number of snapshots to keep.
min_snapshots_to_keep: Option<i32>,
/// Max snapshot ages
max_snapshot_ages: Option<i64>,
/// Max ref ages
max_ref_ages: Option<i64>,
},
/// Update table properties.
SetProperties {
/// Table properties.
props: HashMap<String, String>,
},
/// Remove table properties.
RemoveProperties {
/// Keys to remove.
removed: HashSet<String>,
},
/// Set table location
SetLocation {
/// Table Location
location: String,
},
}

/// Update table request.
pub struct UpdateTable {
table_name: TableIdentifier,
requirements: Vec<UpdateRquirement>,
updates: Vec<MetadataUpdate>,
}
82 changes: 82 additions & 0 deletions icelake/src/catalog/rest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
//! Rest catalog implementation.
//!

use std::collections::HashMap;

use async_trait::async_trait;

use crate::{
table::{Namespace, TableIdentifier},
types::{PartitionSpec, Schema},
Table,
};

use super::{Catalog, UpdateTable};
use crate::error::Result;

/// Rest catalog implementation
pub struct RestCatalog {}

#[async_trait]
impl Catalog for RestCatalog {
/// Return catalog's name.
fn name(&self) -> &str {
todo!()
}

/// List tables under namespace.
async fn list_tables(&self, _ns: &Namespace) -> Result<Vec<TableIdentifier>> {
todo!()
}

/// Creates a table.
async fn create_table(
&self,
_table_name: &TableIdentifier,
_schema: &Schema,
_spec: &PartitionSpec,
_location: &str,
_props: HashMap<String, String>,
) -> Result<Table> {
todo!()
}

/// Check table exists.
async fn table_exists(&self, _table_name: &TableIdentifier) -> Result<bool> {
todo!()
}

/// Drop table.
async fn drop_table(&self, _table_name: &TableIdentifier, _purge: bool) -> Result<()> {
todo!()
}

/// Rename table.
async fn rename_table(&self, _from: &TableIdentifier, _to: &TableIdentifier) -> Result<()> {
todo!()
}

/// Load table.
async fn load_table(&self, _table_name: &TableIdentifier) -> Result<Table> {
todo!()
}

/// Invalidate table.
async fn invalidate_table(&self, _table_name: &TableIdentifier) -> Result<()> {
todo!()
}

/// Register a table using metadata file location.
async fn register_table(
&self,
_table_name: &TableIdentifier,
_metadata_file_location: &str,
) -> Result<Table> {
todo!()
}

/// Update table.
async fn update_table(&self, _udpate_table: &UpdateTable) -> Result<Table> {
todo!()
}
}
1 change: 1 addition & 0 deletions icelake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use error::Error;
pub use error::ErrorKind;
pub use error::Result;

pub mod catalog;
pub mod config;
pub mod io;
pub mod transaction;
Expand Down
11 changes: 11 additions & 0 deletions icelake/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ const METADATA_FILE_EXTENSION: &str = ".metadata.json";
const VERSION_HINT_FILENAME: &str = "version-hint.text";
const VERSIONED_TABLE_METADATA_FILE_PATTERN: &str = r"v([0-9]+).metadata.json";

/// Namespace of tables
pub struct Namespace {
levels: Vec<String>,
}

/// Full qualified name of table.
pub struct TableIdentifier {
namespace: Namespace,
name: String,
}

/// Table is the main entry point for the IceLake.
pub struct Table {
op: Operator,
Expand Down

0 comments on commit 5cd51a1

Please sign in to comment.