From 79664784a378fcb456f13ba5ed87bd468c5339ef Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 18 Jul 2023 16:14:37 +0800 Subject: [PATCH 1/2] feat: Ser/de for schema --- src/types/on_disk/schema.rs | 181 +++++++++++++++++++++--------------- src/types/on_disk/types.rs | 165 +++++++++++++++++++++++++++++--- 2 files changed, 257 insertions(+), 89 deletions(-) diff --git a/src/types/on_disk/schema.rs b/src/types/on_disk/schema.rs index b13dd3c..0950ef7 100644 --- a/src/types/on_disk/schema.rs +++ b/src/types/on_disk/schema.rs @@ -1,9 +1,9 @@ use serde::{Deserialize, Serialize}; use super::types::*; +use crate::types; use crate::Error; use crate::Result; -use crate::{types, ErrorKind}; /// Parse schema from json bytes. pub fn parse_schema(schema: &[u8]) -> Result { @@ -11,10 +11,16 @@ pub fn parse_schema(schema: &[u8]) -> Result { schema.try_into() } -#[derive(Serialize, Deserialize)] +/// Serialize schema to json string. +pub fn serialize_schema(schema: &types::Schema) -> Result { + Ok(serde_json::to_string(&Schema::try_from(schema)?)?) +} + +#[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "kebab-case")] pub struct Schema { schema_id: i32, + #[serde(skip_serializing_if = "Option::is_none")] identifier_field_ids: Option>, fields: Vec, } @@ -39,11 +45,16 @@ impl TryFrom for types::Schema { impl<'a> TryFrom<&'a types::Schema> for Schema { type Error = Error; - fn try_from(_value: &'a types::Schema) -> Result { - Err(Error::new( - ErrorKind::IcebergFeatureUnsupported, - "Serializing schema!", - )) + fn try_from(v: &'a types::Schema) -> Result { + Ok(Self { + schema_id: v.schema_id, + identifier_field_ids: v.identifier_field_ids.as_ref().cloned(), + fields: v + .fields + .iter() + .map(|v| Field::try_from(v.clone())) + .collect::>>()?, + }) } } @@ -51,9 +62,21 @@ impl<'a> TryFrom<&'a types::Schema> for Schema { mod tests { use super::*; + fn check_schema_serde(json_schema: &str, expected_schema: types::Schema) { + let schema = parse_schema(json_schema.as_bytes()).unwrap(); + assert_eq!(expected_schema, schema); + + let serialized_json_schema = serialize_schema(&expected_schema).unwrap(); + + assert_eq!( + expected_schema, + parse_schema(serialized_json_schema.as_bytes()).unwrap() + ); + } + #[test] - fn test_parse_schema_struct() { - let schema = r#" + fn test_schema_json_conversion() { + let json_schema = r#" { "type" : "struct", "schema-id" : 0, @@ -66,18 +89,21 @@ mod tests { } "#; - let schema = parse_schema(schema.as_bytes()).unwrap(); - - assert_eq!(schema.schema_id, 0); - assert_eq!(schema.identifier_field_ids, None); - assert_eq!(schema.fields.len(), 1); - assert_eq!(schema.fields[0].id, 1); - assert_eq!(schema.fields[0].name, "VendorID"); - assert!(!schema.fields[0].required); - assert_eq!( - schema.fields[0].field_type, - types::Any::Primitive(types::Primitive::Long) - ); + let expected_schema = types::Schema { + schema_id: 0, + identifier_field_ids: None, + fields: vec![types::Field { + id: 1, + name: "VendorID".to_string(), + required: false, + field_type: types::Any::Primitive(types::Primitive::Long), + comment: None, + initial_default: None, + write_default: None, + }], + }; + + check_schema_serde(json_schema, expected_schema); } #[test] @@ -97,31 +123,28 @@ mod tests { } "#; + let expected_schema = types::Schema { + schema_id: 0, + identifier_field_ids: None, + fields: vec![types::Field { + id: 1, + name: "VendorID".to_string(), + required: false, + field_type: types::Any::Primitive(types::Primitive::Long), + comment: None, + initial_default: Some(types::AnyValue::Primitive(types::PrimitiveValue::Long(123))), + write_default: Some(types::AnyValue::Primitive(types::PrimitiveValue::Long(456))), + }], + }; + let schema = parse_schema(schema.as_bytes()).unwrap(); - assert_eq!(schema.schema_id, 0); - assert_eq!(schema.identifier_field_ids, None); - assert_eq!(schema.fields.len(), 1); - assert_eq!(schema.fields[0].id, 1); - assert_eq!(schema.fields[0].name, "VendorID"); - assert!(!schema.fields[0].required); - assert_eq!( - schema.fields[0].field_type, - types::Any::Primitive(types::Primitive::Long) - ); - assert_eq!( - schema.fields[0].initial_default, - Some(types::AnyValue::Primitive(types::PrimitiveValue::Long(123))) - ); - assert_eq!( - schema.fields[0].write_default, - Some(types::AnyValue::Primitive(types::PrimitiveValue::Long(456))) - ); + assert_eq!(expected_schema, schema); } #[test] fn test_parse_schema_list() { - let schema = r#" + let json_schema = r#" { "type" : "struct", "schema-id" : 0, @@ -141,27 +164,30 @@ mod tests { } "#; - let schema = parse_schema(schema.as_bytes()).unwrap(); - - assert_eq!(schema.schema_id, 0); - assert_eq!(schema.identifier_field_ids, None); - assert_eq!(schema.fields.len(), 1); - assert_eq!(schema.fields[0].id, 1); - assert_eq!(schema.fields[0].name, "VendorID"); - assert!(!schema.fields[0].required); - assert_eq!( - schema.fields[0].field_type, - types::Any::List(types::List { - element_id: 3, - element_required: true, - element_type: types::Any::Primitive(types::Primitive::String).into(), - }) - ); + let expected_schema = types::Schema { + schema_id: 0, + identifier_field_ids: None, + fields: vec![types::Field { + id: 1, + name: "VendorID".to_string(), + required: false, + field_type: types::Any::List(types::List { + element_id: 3, + element_required: true, + element_type: types::Any::Primitive(types::Primitive::String).into(), + }), + comment: None, + initial_default: None, + write_default: None, + }], + }; + + check_schema_serde(json_schema, expected_schema); } #[test] fn test_parse_schema_map() { - let schema = r#" + let json_schema = r#" { "type" : "struct", "schema-id" : 0, @@ -183,23 +209,28 @@ mod tests { } "#; - let schema = parse_schema(schema.as_bytes()).unwrap(); - - assert_eq!(schema.schema_id, 0); - assert_eq!(schema.identifier_field_ids, None); - assert_eq!(schema.fields.len(), 1); - assert_eq!(schema.fields[0].id, 1); - assert_eq!(schema.fields[0].name, "VendorID"); - assert!(!schema.fields[0].required); - assert_eq!( - schema.fields[0].field_type, - types::Any::Map(types::Map { - key_id: 4, - key_type: types::Any::Primitive(types::Primitive::String).into(), - value_id: 5, - value_required: false, - value_type: types::Any::Primitive(types::Primitive::Double).into(), - }) - ); + let expected_schema = types::Schema { + schema_id: 0, + identifier_field_ids: None, + fields: vec![ + types::Field { + id: 1, + name: "VendorID".to_string(), + required: false, + field_type: types::Any::Map(types::Map { + key_id: 4, + key_type: types::Any::Primitive(types::Primitive::String).into(), + value_id: 5, + value_required: false, + value_type: types::Any::Primitive(types::Primitive::Double).into(), + }), + comment: None, + initial_default: None, + write_default: None, + } + ] + }; + + check_schema_serde(json_schema, expected_schema); } } diff --git a/src/types/on_disk/types.rs b/src/types/on_disk/types.rs index b55e3e6..faaeb87 100644 --- a/src/types/on_disk/types.rs +++ b/src/types/on_disk/types.rs @@ -11,18 +11,19 @@ use chrono::Utc; use rust_decimal::Decimal; use serde::de::MapAccess; use serde::de::Visitor; -use serde::Deserialize; +use serde::ser::{Error as SerError, SerializeStruct}; use serde::Deserializer; use serde::{de, Serialize}; +use serde::{Deserialize, Serializer}; use uuid::Uuid; use crate::types; use crate::types::Any; -use crate::Error; use crate::ErrorKind; use crate::Result; +use crate::Error; -#[derive(Serialize, Deserialize, Default)] +#[derive(Deserialize, Default, Debug)] #[serde(rename_all = "kebab-case", default)] pub struct Types { #[serde(rename = "type")] @@ -180,15 +181,93 @@ impl TryFrom for types::Any { impl TryFrom for Types { type Error = Error; - fn try_from(_value: Any) -> Result { - Err(Error::new( - ErrorKind::IcebergFeatureUnsupported, - "Serializing data types!", - )) + fn try_from(v: Any) -> Result { + match v { + types::Any::Primitive(prim) => { + let typ = match prim { + types::Primitive::Boolean => "boolean".to_string(), + types::Primitive::Int => "int".to_string(), + types::Primitive::Long => "long".to_string(), + types::Primitive::Float => "float".to_string(), + types::Primitive::Double => "double".to_string(), + types::Primitive::Date => "date".to_string(), + types::Primitive::Time => "time".to_string(), + types::Primitive::Timestamp => "timestamp".to_string(), + types::Primitive::Timestampz => "timestamptz".to_string(), + types::Primitive::String => "string".to_string(), + types::Primitive::Uuid => "uuid".to_string(), + types::Primitive::Binary => "binary".to_string(), + types::Primitive::Fixed(length) => format!("fixed[{}]", length), + types::Primitive::Decimal { precision, scale } => { + format!("decimal({}, {})", precision, scale) + } + }; + Ok(Types { + typ, + ..Default::default() + }) + } + types::Any::Struct(types::Struct { fields }) => { + let json_fields = fields + .iter() + .map(|f| -> Result { + Ok(Field { + id: f.id, + name: f.name.clone(), + required: f.required, + typ: f.field_type.clone().try_into()?, + doc: f.comment.clone(), + initial_default: f + .initial_default + .clone() + .map(serialize_value_to_json) + .transpose()?, + write_default: f + .initial_default + .clone() + .map(serialize_value_to_json) + .transpose()?, + }) + }) + .collect::>>()?; + + Ok(Types { + typ: "struct".to_string(), + fields: json_fields, + ..Default::default() + }) + } + types::Any::List(types::List { + element_id, + element_required, + element_type, + }) => Ok(Types { + typ: "list".to_string(), + element_id, + element_required, + element: Some(Box::new((*element_type).clone().try_into()?)), + ..Default::default() + }), + types::Any::Map(types::Map { + key_id, + key_type, + value_id, + value_required, + value_type, + }) => Ok(Types { + typ: "map".to_string(), + key_id, + key: Some(Box::new((*key_type).try_into()?)), + value_id, + value_required, + value: Some(Box::new((*value_type).try_into()?)), + ..Default::default() + }), + } } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "kebab-case")] pub struct Field { id: i32, @@ -196,8 +275,11 @@ pub struct Field { required: bool, #[serde(rename = "type", deserialize_with = "string_or_struct")] typ: Types, + #[serde(skip_serializing_if = "Option::is_none")] doc: Option, + #[serde(skip_serializing_if = "Option::is_none")] initial_default: Option, + #[serde(skip_serializing_if = "Option::is_none")] write_default: Option, } @@ -233,11 +315,16 @@ impl TryFrom for types::Field { impl TryFrom for Field { type Error = Error; - fn try_from(_value: types::Field) -> Result { - Err(Error::new( - ErrorKind::IcebergFeatureUnsupported, - "Serializing Field!", - )) + fn try_from(v: types::Field) -> Result { + Ok(Self { + id: v.id, + name: v.name, + required: v.required, + typ: v.field_type.try_into()?, + doc: v.comment, + initial_default: v.initial_default.map(serialize_value_to_json).transpose()?, + write_default: v.write_default.map(serialize_value_to_json).transpose()?, + }) } } @@ -268,6 +355,10 @@ fn parse_json_value(expect_type: &types::Any, value: serde_json::Value) -> Resul } } +fn serialize_value_to_json(_value: types::AnyValue) -> Result { + todo!() +} + /// JSON single-value serialization requires boolean been stored /// as bool. #[inline] @@ -863,3 +954,49 @@ where deserializer.deserialize_any(StringOrStruct(PhantomData)) } + +impl Serialize for Types { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + match self.typ.as_str() { + "boolean" | "int" | "long" | "float" | "double" | "date" | "time" | "timestamp" + | "timestamptz" | "string" | "uuid" | "binary" => serializer.serialize_str(&self.typ), + v if v.starts_with("decimal") || v.starts_with("fixed") => { + serializer.serialize_str(&self.typ) + } + "struct" => { + let mut state = serializer.serialize_struct("Types", 2)?; + state.serialize_field("type", &self.typ)?; + state.serialize_field("fields", &self.fields)?; + state.end() + } + "list" => { + let mut state = serializer.serialize_struct("Types", 4)?; + state.serialize_field("type", &self.typ)?; + state.serialize_field("element-id", &self.element_id)?; + state.serialize_field("element-required", &self.element_required)?; + if let Some(t) = self.element.as_ref() { + state.serialize_field("element", &t)?; + } + state.end() + } + "map" => { + let mut state = serializer.serialize_struct("Types", 5)?; + state.serialize_field("type", &self.typ)?; + state.serialize_field("key-id", &self.key_id)?; + if let Some(t) = self.key.as_ref() { + state.serialize_field("key", &t)?; + } + state.serialize_field("value-id", &self.value_id)?; + state.serialize_field("value-required", &self.value_required)?; + if let Some(t) = self.value.as_ref() { + state.serialize_field("value", &t)?; + } + state.end() + } + _ => Err(S::Error::custom(format!("Unknown type {}", &self.typ))), + } + } +} From 379ebc22612c84522090dbac0dde9d2eb62d95a0 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 18 Jul 2023 16:15:20 +0800 Subject: [PATCH 2/2] format --- src/types/on_disk/schema.rs | 32 +++++++++++++++----------------- src/types/on_disk/types.rs | 2 +- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/types/on_disk/schema.rs b/src/types/on_disk/schema.rs index 0950ef7..c5d39b9 100644 --- a/src/types/on_disk/schema.rs +++ b/src/types/on_disk/schema.rs @@ -212,23 +212,21 @@ mod tests { let expected_schema = types::Schema { schema_id: 0, identifier_field_ids: None, - fields: vec![ - types::Field { - id: 1, - name: "VendorID".to_string(), - required: false, - field_type: types::Any::Map(types::Map { - key_id: 4, - key_type: types::Any::Primitive(types::Primitive::String).into(), - value_id: 5, - value_required: false, - value_type: types::Any::Primitive(types::Primitive::Double).into(), - }), - comment: None, - initial_default: None, - write_default: None, - } - ] + fields: vec![types::Field { + id: 1, + name: "VendorID".to_string(), + required: false, + field_type: types::Any::Map(types::Map { + key_id: 4, + key_type: types::Any::Primitive(types::Primitive::String).into(), + value_id: 5, + value_required: false, + value_type: types::Any::Primitive(types::Primitive::Double).into(), + }), + comment: None, + initial_default: None, + write_default: None, + }], }; check_schema_serde(json_schema, expected_schema); diff --git a/src/types/on_disk/types.rs b/src/types/on_disk/types.rs index faaeb87..0e0d29a 100644 --- a/src/types/on_disk/types.rs +++ b/src/types/on_disk/types.rs @@ -19,9 +19,9 @@ use uuid::Uuid; use crate::types; use crate::types::Any; +use crate::Error; use crate::ErrorKind; use crate::Result; -use crate::Error; #[derive(Deserialize, Default, Debug)] #[serde(rename_all = "kebab-case", default)]