-
Notifications
You must be signed in to change notification settings - Fork 704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: handle partitions with empty table in read_parquet with dataset=True #2983
base: main
Are you sure you want to change the base?
Conversation
…mpty. When reading a set of parquet files with dataset=True, if the first partition is empty the current logic for dtype inference will fail. It ill raise exceptions as follows: ``` pyarrow.lib.ArrowTypeError: Unable to merge: Field col0 has incompatible types: dictionary<values=null, indices=int32, ordered=0> vs dictionary<values=string, indices=int32, ordered=0 ``` To fix this, we filter out empty table(s) before merging them into one parquet file.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
While that corner case was caughed in the full test suite, we add a mock test for this corner case for quick turnaround.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
I am bit confused about the way you are generating your partitioned dataset. Specifically this bit: for i, df in enumerate(dataframes):
wr.s3.to_parquet(
df=df,
path=f"{s3_key}/part{i}.parquet",
) You are artificially creating the partitions instead of relying on awswrangler or pandas to do it. When I rewrite your test with a proper partitioning call, the exception is not raised: def test_s3_dataset_empty_table(moto_s3_client: "S3Client") -> None:
"""Test that a dataset split into multiple parquet files whose first
partition is an empty table still loads properly.
"""
s3_key = f"s3://bucket/"
dtypes = {"id": "string[python]"}
df1 = pd.DataFrame({"id": []}).astype(dtypes)
df2 = pd.DataFrame({"id": ["1"] * 2}).astype(dtypes)
df3 = pd.DataFrame({"id": ["1"] * 3}).astype(dtypes)
dataframes = [df1, df2, df3]
r_df = pd.concat(dataframes, ignore_index=True)
r_df = r_df.assign(col0=pd.Categorical(["1"] * len(r_df)))
wr.s3.to_parquet(r_df, path=s3_key, dataset=True, partition_cols=["col0"])
result_df = wr.s3.read_parquet(path=s3_key, dataset=True)
pd.testing.assert_frame_equal(result_df, r_df, check_dtype=True) The difference being that in the code above, the files are written as a single dataset and the metadata for it is preserved |
In that test example, there is no empty table written to s3 since you are concatenating the dataframes before writing to s3. To trigger the error, the execution needs to go through aws-sdk-pandas/awswrangler/_arrow.py Line 33 in 635f6d5
That exception I am trying to fix is triggered by some real parquet datasets created from spark. If that helps, happy to give details through our internal slack at amazon, including the dataset. |
To trigger the bug, you need all of the following to be true
When that happens, the underlying error is the type inference happening here: https://github.com/aws/aws-sdk-pandas/blob/main/awswrangler/_arrow.py#L41. If a table is not empty, part_value will contain the right "type" for that added column partition. But if table is empty, you get a type that is independent of the value, e.g. >>> pa.array(["1"] * 0).dictionary_encode()
<pyarrow.lib.DictionaryArray object at 0x7f63e97d9cb0>
-- dictionary:
0 nulls
-- indices:
[] vs (no empty table) >>> pa.array(["1"] * 1).dictionary_encode()
<pyarrow.lib.DictionaryArray object at 0x7f63bda8eb20>
-- dictionary:
[
"1"
]
-- indices:
[
0
] When both cases happen in the tables list, you get an exception when merging them because of incompatible column types. |
@jaidisido anything else I could provide to move this PR forward ? Happy to add more tests if needed |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
When reading a set of parquet files with dataset=True, if the first partition is empty the current logic for dtype inference will fail. It ill raise exceptions as follows:
To fix this, we filter out empty table(s) before merging them into one parquet file.
Note: I have only run the mock test suite, I can't easily run the suite against actual AWS services.