Skip to content

Commit

Permalink
Ignore nested repeated fields (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChewingGlass authored Feb 14, 2024
1 parent 33de6c6 commit 837a3b7
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
6 changes: 3 additions & 3 deletions protobuf-delta-lake-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Context, Result};
use chrono::{NaiveDateTime, Utc};
use clap::Parser;
use datafusion::arrow::array::StringArray;
use datafusion::{arrow::array::StringArray, common::delta};
use deltalake::{
action::{self, Action, CommitInfo, SaveMode},
checkpoints, crate_version,
Expand Down Expand Up @@ -116,7 +116,7 @@ async fn main() -> Result<()> {
args.source_proto_name,
)
.await?;
let mut delta_fields = get_delta_schema(&descriptor);
let mut delta_fields = get_delta_schema(&descriptor, false);
if args.partition_timestamp_column.is_some() {
let date_field = SchemaField::new(
"date".to_string(),
Expand Down Expand Up @@ -303,7 +303,7 @@ async fn main() -> Result<()> {
.map(|m| (m.0.as_str(), m.1.as_ref()))
.collect(),
args.partition_timestamp_column.clone(),
args.partition_timestamp_date_divisor
args.partition_timestamp_date_divisor,
)?;

writer.write(batch).await?;
Expand Down
14 changes: 7 additions & 7 deletions protobuf-delta-lake-sink/src/proto/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ impl ReflectBuilder for StructReflectBuilder {
builder.append_value(message.and_then(|m| field.get_singular(m)))
}
protobuf::reflect::RuntimeFieldType::Repeated(_) => {
panic!("Repeated fields in a nested message are not supported")
// Do nothing
}
protobuf::reflect::RuntimeFieldType::Map(_, _) => {
panic!("Map fields are not supported")
Expand All @@ -382,7 +382,7 @@ fn runtime_type_to_data_type(value: &RuntimeType) -> DataType {
RuntimeType::VecU8 => DataType::Binary,
RuntimeType::Enum(_) => DataType::Binary,
RuntimeType::Message(m) => {
let fields = get_delta_schema(m);
let fields = get_delta_schema(m, true);
let schema = <deltalake::arrow::datatypes::Schema as TryFrom<&Schema>>::try_from(
&SchemaTypeStruct::new(fields),
)
Expand Down Expand Up @@ -428,19 +428,19 @@ fn get_builder(t: &RuntimeType, capacity: usize) -> Result<Box<dyn ReflectArrayB
enum_descriptor: enum_descriptor.clone(),
}),
RuntimeType::Message(m) => {
let schema = Schema::new(get_delta_schema(m));
let schema = Schema::new(get_delta_schema(m, true));
let arrow_schema =
<deltalake::arrow::datatypes::Schema as TryFrom<&Schema>>::try_from(&schema)?;
let builders = m
.clone()
.fields()
.map(|field| match field.runtime_field_type() {
protobuf::reflect::RuntimeFieldType::Singular(t) => get_builder(&t, capacity),
.flat_map(|field| match field.runtime_field_type() {
protobuf::reflect::RuntimeFieldType::Singular(t) => Some(get_builder(&t, capacity)),
protobuf::reflect::RuntimeFieldType::Repeated(_) => {
bail!("Repeated fields in a nested message are not supported")
None
}
protobuf::reflect::RuntimeFieldType::Map(_, _) => {
bail!("Map fields are not supported")
None
}
})
.collect::<Result<Vec<Box<dyn ReflectArrayBuilder>>>>()?;
Expand Down
18 changes: 9 additions & 9 deletions protobuf-delta-lake-sink/src/proto/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub fn get_single_delta_schema(field_name: &str, field_type: RuntimeType) -> Sch
protobuf::reflect::RuntimeType::Message(m) => {
return SchemaField::new(
field_name.to_string(),
SchemaDataType::r#struct(SchemaTypeStruct::new(get_delta_schema(&m))),
SchemaDataType::r#struct(SchemaTypeStruct::new(get_delta_schema(&m, true))),
true,
HashMap::new(),
);
Expand All @@ -88,27 +88,27 @@ pub fn get_single_delta_schema(field_name: &str, field_type: RuntimeType) -> Sch
)
}

pub fn get_delta_schema(descriptor: &MessageDescriptor) -> Vec<SchemaField> {
pub fn get_delta_schema(descriptor: &MessageDescriptor, nested: bool) -> Vec<SchemaField> {
descriptor
.fields()
.map(|f| {
.flat_map(|f| {
let field_name = f.name();
let field_type = match f.runtime_field_type() {
protobuf::reflect::RuntimeFieldType::Singular(t) => t,
protobuf::reflect::RuntimeFieldType::Repeated(t) => {
return SchemaField::new(
protobuf::reflect::RuntimeFieldType::Singular(t) => Some(t),
protobuf::reflect::RuntimeFieldType::Repeated(t) if !nested => {
return Some(SchemaField::new(
field_name.to_string(),
SchemaDataType::array(SchemaTypeArray::new(
Box::new(get_single_delta_schema(field_name, t).get_type().clone()),
true,
)),
true,
HashMap::new(),
);
));
}
_ => panic!("Map fields are not supported"),
_ => None
};
get_single_delta_schema(field_name, field_type)
field_type.map(|t| get_single_delta_schema(field_name, t))
})
.collect::<Vec<_>>()
}

0 comments on commit 837a3b7

Please sign in to comment.