From 8722cc7d1b4d3beed6c2c4616f0ba6a76e7fab6e Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 19 Jul 2023 13:29:04 +0800 Subject: [PATCH] Complete avro types conversion (#105) --- src/types/to_avro.rs | 113 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 89 insertions(+), 24 deletions(-) diff --git a/src/types/to_avro.rs b/src/types/to_avro.rs index 54284c1..884e83a 100644 --- a/src/types/to_avro.rs +++ b/src/types/to_avro.rs @@ -31,7 +31,36 @@ impl<'a> TryFrom<&'a Field> for AvroRecordField { type Error = Error; fn try_from(value: &'a Field) -> Result { - let avro_schema = match &value.field_type { + let mut avro_schema = AvroSchema::try_from(&value.field_type)?; + // An ugly workaround, let's fix it later. + if let Any::Struct(_) = &value.field_type { + match &mut avro_schema { + AvroSchema::Record(r) => { + r.name = Name::from(value.name.as_str()); + r.doc = value.comment.clone(); + } + _ => panic!("Struct record should be converted to avro struct schema."), + } + } + + Ok(AvroRecordField { + name: value.name.clone(), + doc: value.comment.clone(), + aliases: None, + default: None, + schema: avro_schema, + order: RecordFieldOrder::Ignore, + position: 0, + custom_attributes: BTreeMap::default(), + }) + } +} + +impl<'a> TryFrom<&'a Any> for AvroSchema { + type Error = Error; + + fn try_from(value: &'a Any) -> Result { + let avro_schema = match &value { Any::Primitive(data_type) => match data_type { Primitive::Boolean => AvroSchema::Boolean, Primitive::Int => AvroSchema::Int, @@ -55,6 +84,21 @@ impl<'a> TryFrom<&'a Field> for AvroRecordField { )) } }, + Any::Map(map) => { + if *map.key_type != Any::Primitive(Primitive::String) { + return Err(Error::new( + ErrorKind::IcebergFeatureUnsupported, + format!( + "Unable to convert iceberg data type map with key type {:?} to avro type, since avro assumes keys are always strings", + *map.key_type + ), + )); + } + AvroSchema::Map(Box::new(AvroSchema::try_from(&*map.value_type)?)) + } + Any::List(list) => { + AvroSchema::Array(Box::new(AvroSchema::try_from(&*list.element_type)?)) + } Any::Struct(s) => { let avro_fields: Vec = s .fields @@ -62,42 +106,23 @@ impl<'a> TryFrom<&'a Field> for AvroRecordField { .map(AvroRecordField::try_from) .collect::>>()?; AvroSchema::Record(RecordSchema { - name: Name::from(value.name.as_str()), + name: Name::from("invalid_name"), fields: avro_fields, aliases: None, - doc: value.comment.clone(), + doc: None, lookup: BTreeMap::new(), attributes: BTreeMap::new(), }) } - r#type => { - return Err(Error::new( - ErrorKind::IcebergFeatureUnsupported, - format!( - "Unable to convert iceberg data type {:?} to avro type", - r#type - ), - )) - } }; - - Ok(AvroRecordField { - name: value.name.clone(), - doc: value.comment.clone(), - aliases: None, - default: None, - schema: avro_schema, - order: RecordFieldOrder::Ignore, - position: 0, - custom_attributes: BTreeMap::default(), - }) + Ok(avro_schema) } } #[cfg(test)] mod tests { use super::*; - use crate::types::Struct; + use crate::types::{List, Map, Struct}; #[test] fn test_convert_to_avro() { @@ -138,6 +163,34 @@ mod tests { initial_default: None, write_default: None, }, + Field { + id: 5, + name: "e".to_string(), + required: true, + field_type: Any::List(List { + element_id: 1, + element_required: true, + element_type: Box::new(Any::Primitive(Primitive::Long)), + }), + comment: Some("comment_e".to_string()), + initial_default: None, + write_default: None, + }, + Field { + id: 6, + name: "f".to_string(), + required: false, + field_type: Any::Map(Map { + key_id: 2, + key_type: Box::new(Any::Primitive(Primitive::String)), + value_id: 3, + value_required: false, + value_type: Box::new(Any::Primitive(Primitive::Binary)), + }), + comment: Some("comment_f".to_string()), + initial_default: None, + write_default: None, + }, ], }), comment: Some("comment_b".to_string()), @@ -176,6 +229,18 @@ mod tests { "type": "boolean", "doc": "comment_d", "order": "ignore" + }, + { + "name": "e", + "type": { "type": "array", "items": "long" }, + "doc": "comment_e", + "order": "ignore" + }, + { + "name": "f", + "type": { "type": "map", "values": "bytes" }, + "doc": "comment_f", + "order": "ignore" } ], "order": "ignore"