Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
feat: Ser/de partition spec
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Jul 18, 2023
1 parent 99d8d92 commit ef71e86
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 32 deletions.
15 changes: 15 additions & 0 deletions src/types/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,21 @@ impl Transform {
}
}

impl<'a> ToString for &'a Transform {
fn to_string(&self) -> String {
match self {
Transform::Identity => "identity".to_string(),
Transform::Year => "year".to_string(),
Transform::Month => "month".to_string(),
Transform::Day => "day".to_string(),
Transform::Hour => "hour".to_string(),
Transform::Void => "void".to_string(),
Transform::Bucket(length) => format!("bucket[{}]", length),
Transform::Truncate(width) => format!("truncate[{}]", width),
}
}
}

/// Data files are stored in manifests with a tuple of partition values
/// that are used in scans to filter out files that cannot contain records
/// that match the scan’s filter predicate.
Expand Down
100 changes: 68 additions & 32 deletions src/types/on_disk/partition_spec.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
use serde::{Deserialize, Serialize};

use super::transform::parse_transform;
use crate::types;
use crate::Error;
use crate::Result;
use crate::{types, ErrorKind};

/// Parse schema from json bytes.
pub fn parse_partition_spec(bs: &[u8]) -> Result<types::PartitionSpec> {
let t: PartitionSpec = serde_json::from_slice(bs)?;
t.try_into()
}

/// Serialize partition spec to json bytes.
pub fn serialize_partition_spec(spec: &types::PartitionSpec) -> Result<String> {
let t = PartitionSpec::try_from(spec)?;
Ok(serde_json::to_string(&t)?)
}

pub fn serialize_partition_spec_fields(spec: &types::PartitionSpec) -> Result<String> {
let t = PartitionSpec::try_from(spec)?;
Ok(serde_json::to_string(&t.fields)?)
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct PartitionSpec {
Expand All @@ -36,11 +47,15 @@ impl TryFrom<PartitionSpec> for types::PartitionSpec {

impl<'a> TryFrom<&'a types::PartitionSpec> for PartitionSpec {
type Error = Error;
fn try_from(_value: &'a types::PartitionSpec) -> Result<Self> {
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Serializing partition spec!",
))
fn try_from(v: &'a types::PartitionSpec) -> Result<Self> {
Ok(Self {
spec_id: v.spec_id,
fields: v
.fields
.iter()
.map(PartitionField::try_from)
.collect::<Result<Vec<PartitionField>>>()?,
})
}
}

Expand Down Expand Up @@ -71,18 +86,41 @@ impl TryFrom<PartitionField> for types::PartitionField {
impl<'a> TryFrom<&'a types::PartitionField> for PartitionField {
type Error = Error;

fn try_from(_v: &'a types::PartitionField) -> Result<Self> {
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Serializing partition field!",
))
fn try_from(v: &'a types::PartitionField) -> Result<Self> {
Ok(Self {
source_id: v.source_column_id,
field_id: v.partition_field_id,
name: v.name.clone(),
transform: (&v.transform).to_string(),
})
}
}

#[cfg(test)]
mod tests {
use super::*;

fn check_partition_spec_conversion(json: &str, expected_partition_spec: types::PartitionSpec) {
let parsed = parse_partition_spec(json.as_bytes()).unwrap();
assert_eq!(expected_partition_spec, parsed);

let serialized_json = serialize_partition_spec(&expected_partition_spec).unwrap();
let parsed = parse_partition_spec(serialized_json.as_bytes()).unwrap();
assert_eq!(expected_partition_spec, parsed);

let serialized_fields_json =
serialize_partition_spec_fields(&expected_partition_spec).unwrap();
let parse_fields: Vec<PartitionField> =
serde_json::from_slice(serialized_fields_json.as_bytes()).unwrap();
let parse_type_fields = parse_fields
.into_iter()
.map(types::PartitionField::try_from)
.collect::<Result<Vec<types::PartitionField>>>()
.unwrap();

assert_eq!(expected_partition_spec.fields, parse_type_fields);
}

#[test]
fn test_parse_partition_spec() {
let content = r#"
Expand All @@ -102,27 +140,25 @@ mod tests {
}
"#;

let v = parse_partition_spec(content.as_bytes()).unwrap();

assert_eq!(v.spec_id, 1);
assert_eq!(v.fields.len(), 2);
assert_eq!(
v.fields[0],
types::PartitionField {
source_column_id: 4,
partition_field_id: 1000,
transform: types::Transform::Day,
name: "ts_day".to_string(),
}
check_partition_spec_conversion(
content,
types::PartitionSpec {
spec_id: 1,
fields: vec![
types::PartitionField {
source_column_id: 4,
partition_field_id: 1000,
transform: types::Transform::Day,
name: "ts_day".to_string(),
},
types::PartitionField {
source_column_id: 1,
partition_field_id: 1001,
transform: types::Transform::Bucket(16),
name: "id_bucket".to_string(),
},
],
},
);
assert_eq!(
v.fields[1],
types::PartitionField {
source_column_id: 1,
partition_field_id: 1001,
transform: types::Transform::Bucket(16),
name: "id_bucket".to_string(),
}
)
}
}

0 comments on commit ef71e86

Please sign in to comment.