Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

feat: Ser/de partition spec #101

Merged
merged 1 commit into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/types/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,21 @@ impl Transform {
}
}

impl<'a> ToString for &'a Transform {
fn to_string(&self) -> String {
match self {
Transform::Identity => "identity".to_string(),
Transform::Year => "year".to_string(),
Transform::Month => "month".to_string(),
Transform::Day => "day".to_string(),
Transform::Hour => "hour".to_string(),
Transform::Void => "void".to_string(),
Transform::Bucket(length) => format!("bucket[{}]", length),
Transform::Truncate(width) => format!("truncate[{}]", width),
}
}
}

/// Data files are stored in manifests with a tuple of partition values
/// that are used in scans to filter out files that cannot contain records
/// that match the scan’s filter predicate.
Expand Down
100 changes: 68 additions & 32 deletions src/types/on_disk/partition_spec.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
use serde::{Deserialize, Serialize};

use super::transform::parse_transform;
use crate::types;
use crate::Error;
use crate::Result;
use crate::{types, ErrorKind};

/// Parse schema from json bytes.
pub fn parse_partition_spec(bs: &[u8]) -> Result<types::PartitionSpec> {
let t: PartitionSpec = serde_json::from_slice(bs)?;
t.try_into()
}

/// Serialize partition spec to json bytes.
pub fn serialize_partition_spec(spec: &types::PartitionSpec) -> Result<String> {
let t = PartitionSpec::try_from(spec)?;
Ok(serde_json::to_string(&t)?)
}

pub fn serialize_partition_spec_fields(spec: &types::PartitionSpec) -> Result<String> {
let t = PartitionSpec::try_from(spec)?;
Ok(serde_json::to_string(&t.fields)?)
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct PartitionSpec {
Expand All @@ -36,11 +47,15 @@ impl TryFrom<PartitionSpec> for types::PartitionSpec {

impl<'a> TryFrom<&'a types::PartitionSpec> for PartitionSpec {
type Error = Error;
fn try_from(_value: &'a types::PartitionSpec) -> Result<Self> {
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Serializing partition spec!",
))
fn try_from(v: &'a types::PartitionSpec) -> Result<Self> {
Ok(Self {
spec_id: v.spec_id,
fields: v
.fields
.iter()
.map(PartitionField::try_from)
.collect::<Result<Vec<PartitionField>>>()?,
})
}
}

Expand Down Expand Up @@ -71,18 +86,41 @@ impl TryFrom<PartitionField> for types::PartitionField {
impl<'a> TryFrom<&'a types::PartitionField> for PartitionField {
type Error = Error;

fn try_from(_v: &'a types::PartitionField) -> Result<Self> {
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Serializing partition field!",
))
fn try_from(v: &'a types::PartitionField) -> Result<Self> {
Ok(Self {
source_id: v.source_column_id,
field_id: v.partition_field_id,
name: v.name.clone(),
transform: (&v.transform).to_string(),
})
}
}

#[cfg(test)]
mod tests {
use super::*;

fn check_partition_spec_conversion(json: &str, expected_partition_spec: types::PartitionSpec) {
let parsed = parse_partition_spec(json.as_bytes()).unwrap();
assert_eq!(expected_partition_spec, parsed);

let serialized_json = serialize_partition_spec(&expected_partition_spec).unwrap();
let parsed = parse_partition_spec(serialized_json.as_bytes()).unwrap();
assert_eq!(expected_partition_spec, parsed);

let serialized_fields_json =
serialize_partition_spec_fields(&expected_partition_spec).unwrap();
let parse_fields: Vec<PartitionField> =
serde_json::from_slice(serialized_fields_json.as_bytes()).unwrap();
let parse_type_fields = parse_fields
.into_iter()
.map(types::PartitionField::try_from)
.collect::<Result<Vec<types::PartitionField>>>()
.unwrap();

assert_eq!(expected_partition_spec.fields, parse_type_fields);
}

#[test]
fn test_parse_partition_spec() {
let content = r#"
Expand All @@ -102,27 +140,25 @@ mod tests {
}
"#;

let v = parse_partition_spec(content.as_bytes()).unwrap();

assert_eq!(v.spec_id, 1);
assert_eq!(v.fields.len(), 2);
assert_eq!(
v.fields[0],
types::PartitionField {
source_column_id: 4,
partition_field_id: 1000,
transform: types::Transform::Day,
name: "ts_day".to_string(),
}
check_partition_spec_conversion(
content,
types::PartitionSpec {
spec_id: 1,
fields: vec![
types::PartitionField {
source_column_id: 4,
partition_field_id: 1000,
transform: types::Transform::Day,
name: "ts_day".to_string(),
},
types::PartitionField {
source_column_id: 1,
partition_field_id: 1001,
transform: types::Transform::Bucket(16),
name: "id_bucket".to_string(),
},
],
},
);
assert_eq!(
v.fields[1],
types::PartitionField {
source_column_id: 1,
partition_field_id: 1001,
transform: types::Transform::Bucket(16),
name: "id_bucket".to_string(),
}
)
}
}