Skip to content

Commit

Permalink
feat: Implement nested Parquet writing for High-Precision Decimals (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Oct 27, 2024
1 parent 687811d commit 40c7072
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 64 deletions.
2 changes: 1 addition & 1 deletion crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use polars_error::{polars_bail, PolarsResult};
use super::binary::{
build_statistics as binary_build_statistics, encode_plain as binary_encode_plain,
};
use super::fixed_len_bytes::{
use super::fixed_size_binary::{
build_statistics as fixed_binary_build_statistics, encode_plain as fixed_binary_encode_plain,
};
use super::pages::PrimitiveNested;
Expand Down
47 changes: 47 additions & 0 deletions crates/polars-parquet/src/arrow/write/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use arrow::array::{Array, FixedSizeBinaryArray};
use polars_error::PolarsResult;

use super::encode_plain;
use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::FixedLenStatistics;
use crate::read::schema::is_nullable;
use crate::write::{utils, EncodeNullability, Encoding, WriteOptions};

pub fn array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
type_: PrimitiveType,
statistics: Option<FixedLenStatistics>,
) -> PolarsResult<DataPage> {
let is_optional = is_nullable(&type_.field_info);
let encode_options = EncodeNullability::new(is_optional);

let validity = array.validity();

let mut buffer = vec![];
utils::write_def_levels(
&mut buffer,
is_optional,
validity,
array.len(),
options.version,
)?;

let definition_levels_byte_length = buffer.len();

encode_plain(array, encode_options, &mut buffer);

utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
statistics.map(|x| x.serialize()),
type_,
options,
Encoding::Plain,
)
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
mod basic;
mod nested;

use arrow::array::{Array, FixedSizeBinaryArray, PrimitiveArray};
use arrow::types::i256;
use polars_error::PolarsResult;
pub use basic::array_to_page;
pub use nested::array_to_page as nested_array_to_page;

use super::binary::ord_binary;
use super::{utils, EncodeNullability, StatisticsOptions, WriteOptions};
use crate::arrow::read::schema::is_nullable;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::DataPage;
use super::{EncodeNullability, StatisticsOptions};
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::FixedLenStatistics;

Expand All @@ -27,44 +28,6 @@ pub(crate) fn encode_plain(
}
}

pub fn array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
type_: PrimitiveType,
statistics: Option<FixedLenStatistics>,
) -> PolarsResult<DataPage> {
let is_optional = is_nullable(&type_.field_info);
let encode_options = EncodeNullability::new(is_optional);

let validity = array.validity();

let mut buffer = vec![];
utils::write_def_levels(
&mut buffer,
is_optional,
validity,
array.len(),
options.version,
)?;

let definition_levels_byte_length = buffer.len();

encode_plain(array, encode_options, &mut buffer);

utils::build_plain_page(
buffer,
array.len(),
array.len(),
array.null_count(),
0,
definition_levels_byte_length,
statistics.map(|x| x.serialize()),
type_,
options,
Encoding::Plain,
)
}

pub(super) fn build_statistics(
array: &FixedSizeBinaryArray,
primitive_type: PrimitiveType,
Expand Down
39 changes: 39 additions & 0 deletions crates/polars-parquet/src/arrow/write/fixed_size_binary/nested.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use arrow::array::{Array, FixedSizeBinaryArray};
use polars_error::PolarsResult;

use super::encode_plain;
use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::FixedLenStatistics;
use crate::read::schema::is_nullable;
use crate::write::{nested, utils, EncodeNullability, Encoding, Nested, WriteOptions};

pub fn array_to_page(
array: &FixedSizeBinaryArray,
options: WriteOptions,
type_: PrimitiveType,
nested: &[Nested],
statistics: Option<FixedLenStatistics>,
) -> PolarsResult<DataPage> {
let is_optional = is_nullable(&type_.field_info);
let encode_options = EncodeNullability::new(is_optional);

let mut buffer = vec![];
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(array, encode_options, &mut buffer);

utils::build_plain_page(
buffer,
nested::num_values(nested),
nested[0].len(),
array.null_count(),
repetition_levels_byte_length,
definition_levels_byte_length,
statistics.map(|x| x.serialize()),
type_,
options,
Encoding::Plain,
)
}
38 changes: 19 additions & 19 deletions crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod binview;
mod boolean;
mod dictionary;
mod file;
mod fixed_len_bytes;
mod fixed_size_binary;
mod nested;
mod pages;
mod primitive;
Expand Down Expand Up @@ -528,15 +528,15 @@ pub fn array_to_page_simple(
array.validity().cloned(),
);
let statistics = if options.has_statistics() {
Some(fixed_len_bytes::build_statistics(
Some(fixed_size_binary::build_statistics(
&array,
type_.clone(),
&options.statistics,
))
} else {
None
};
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
},
ArrowDataType::Interval(IntervalUnit::DayTime) => {
let array = array
Expand All @@ -555,20 +555,20 @@ pub fn array_to_page_simple(
array.validity().cloned(),
);
let statistics = if options.has_statistics() {
Some(fixed_len_bytes::build_statistics(
Some(fixed_size_binary::build_statistics(
&array,
type_.clone(),
&options.statistics,
))
} else {
None
};
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
},
ArrowDataType::FixedSizeBinary(_) => {
let array = array.as_any().downcast_ref().unwrap();
let statistics = if options.has_statistics() {
Some(fixed_len_bytes::build_statistics(
Some(fixed_size_binary::build_statistics(
array,
type_.clone(),
&options.statistics,
Expand All @@ -577,7 +577,7 @@ pub fn array_to_page_simple(
None
};

fixed_len_bytes::array_to_page(array, options, type_, statistics)
fixed_size_binary::array_to_page(array, options, type_, statistics)
},
ArrowDataType::Decimal256(precision, _) => {
let precision = *precision;
Expand Down Expand Up @@ -620,7 +620,7 @@ pub fn array_to_page_simple(
} else if precision <= 38 {
let size = decimal_length_from_precision(precision);
let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
array,
type_.clone(),
size,
Expand All @@ -641,15 +641,15 @@ pub fn array_to_page_simple(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
} else {
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal256(
let stats = fixed_size_binary::build_statistics_decimal256(
array,
type_.clone(),
size,
Expand All @@ -670,7 +670,7 @@ pub fn array_to_page_simple(
array.validity().cloned(),
);

fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
}
},
ArrowDataType::Decimal(precision, _) => {
Expand Down Expand Up @@ -715,7 +715,7 @@ pub fn array_to_page_simple(
let size = decimal_length_from_precision(precision);

let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal(
let stats = fixed_size_binary::build_statistics_decimal(
array,
type_.clone(),
size,
Expand All @@ -736,7 +736,7 @@ pub fn array_to_page_simple(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::array_to_page(&array, options, type_, statistics)
}
},
other => polars_bail!(nyi = "Writing parquet pages for data type {other:?}"),
Expand Down Expand Up @@ -858,7 +858,7 @@ fn array_to_page_nested(
let size = decimal_length_from_precision(precision);

let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal(
let stats = fixed_size_binary::build_statistics_decimal(
array,
type_.clone(),
size,
Expand All @@ -879,7 +879,7 @@ fn array_to_page_nested(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
}
},
Decimal256(precision, _) => {
Expand Down Expand Up @@ -919,7 +919,7 @@ fn array_to_page_nested(
} else if precision <= 38 {
let size = decimal_length_from_precision(precision);
let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal256_with_i128(
let stats = fixed_size_binary::build_statistics_decimal256_with_i128(
array,
type_.clone(),
size,
Expand All @@ -940,15 +940,15 @@ fn array_to_page_nested(
values.into(),
array.validity().cloned(),
);
fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
} else {
let size = 32;
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<i256>>()
.unwrap();
let statistics = if options.has_statistics() {
let stats = fixed_len_bytes::build_statistics_decimal256(
let stats = fixed_size_binary::build_statistics_decimal256(
array,
type_.clone(),
size,
Expand All @@ -969,7 +969,7 @@ fn array_to_page_nested(
array.validity().cloned(),
);

fixed_len_bytes::array_to_page(&array, options, type_, statistics)
fixed_size_binary::nested_array_to_page(&array, options, type_, nested, statistics)
}
},
other => polars_bail!(nyi = "Writing nested parquet pages for data type {other:?}"),
Expand Down
46 changes: 45 additions & 1 deletion py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import decimal
import io
from datetime import datetime, time, timezone
from decimal import Decimal
from typing import IO, TYPE_CHECKING, Any, Literal, cast
from typing import IO, TYPE_CHECKING, Any, Callable, Literal, cast

import fsspec
import numpy as np
Expand Down Expand Up @@ -1995,6 +1996,49 @@ def test_nested_nonnullable_19158() -> None:
assert_frame_equal(pl.read_parquet(f), pl.DataFrame(tbl))


D = Decimal


@pytest.mark.parametrize("precision", range(1, 37, 2))
@pytest.mark.parametrize(
"nesting",
[
# Struct
lambda t: ([{"x": None}, None], pl.Struct({"x": t})),
lambda t: ([None, {"x": None}], pl.Struct({"x": t})),
lambda t: ([{"x": D("1.5")}, None], pl.Struct({"x": t})),
lambda t: ([{"x": D("1.5")}, {"x": D("4.8")}], pl.Struct({"x": t})),
# Array
lambda t: ([[None, None, D("8.2")], None], pl.Array(t, 3)),
lambda t: ([None, [None, D("8.9"), None]], pl.Array(t, 3)),
lambda t: ([[D("1.5"), D("3.7"), D("4.1")], None], pl.Array(t, 3)),
lambda t: (
[[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("5.2"), D("8.9")]],
pl.Array(t, 3),
),
# List
lambda t: ([[None, D("8.2")], None], pl.List(t)),
lambda t: ([None, [D("8.9"), None]], pl.List(t)),
lambda t: ([[D("1.5"), D("4.1")], None], pl.List(t)),
lambda t: ([[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("8.9")]], pl.List(t)),
],
)
def test_decimal_precision_nested_roundtrip(
nesting: Callable[[pl.DataType], tuple[list[Any], pl.DataType]],
precision: int,
) -> None:
# Limit the context as to not disturb any other tests
with decimal.localcontext() as ctx:
ctx.prec = precision

decimal_dtype = pl.Decimal(precision=precision)
values, dtype = nesting(decimal_dtype)

df = pl.Series("a", values, dtype).to_frame()

test_round_trip(df)


@pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"])
def test_conserve_sortedness(
monkeypatch: Any, capfd: Any, parallel: pl.ParallelStrategy
Expand Down

0 comments on commit 40c7072

Please sign in to comment.