diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index 5116b3937..f9ee13376 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -311,6 +311,14 @@ def _read_parquet( itertools.repeat(schema), itertools.repeat(decryption_properties), ) + # When the first table is empty in a dataset, the inferred schema may not + # be compatible with the other tables, which will raise an exception when + # concatening them down the line. As a workaround, we filter out empty + # tables, unless every table is empty. In that latter case, the schemas + # will be compatible so we do nothing in that case. + should_filter_out = any(len(table) > 0 for table in tables) + if should_filter_out: + tables = [table for table in tables if len(table) > 0] return _utils.table_refs_to_df(tables, kwargs=arrow_kwargs) diff --git a/tests/unit/test_moto.py b/tests/unit/test_moto.py index ac728135f..7ed84afe4 100644 --- a/tests/unit/test_moto.py +++ b/tests/unit/test_moto.py @@ -485,6 +485,47 @@ def test_s3_delete_object_success(moto_s3_client: "S3Client") -> None: wr.s3.read_parquet(path=path, dataset=True) +@pytest.mark.parametrize("chunked", [True, False]) +def test_s3_parquet_empty_table(moto_s3_client: "S3Client", chunked) -> None: + path = "s3://bucket/file.parquet" + + r_df = pd.DataFrame({"id": []}, dtype=pd.Int64Dtype()) + wr.s3.to_parquet(df=r_df, path=path) + + df = wr.s3.read_parquet(path, chunked=chunked) + if chunked: + df = pd.concat(list(df)) + + pd.testing.assert_frame_equal(r_df, df, check_dtype=True) + + +def test_s3_dataset_empty_table(moto_s3_client: "S3Client") -> None: + """Test that a dataset split into multiple parquet files whose first + partition is an empty table still loads properly. + """ + partition_col, partition_val = "col0", "1" + dataset = f"{partition_col}={partition_val}" + s3_key = f"s3://bucket/{dataset}" + + dtypes = {"id": "string[python]"} + df1 = pd.DataFrame({"id": []}).astype(dtypes) + df2 = pd.DataFrame({"id": ["1"] * 2}).astype(dtypes) + df3 = pd.DataFrame({"id": ["1"] * 3}).astype(dtypes) + + dataframes = [df1, df2, df3] + r_df = pd.concat(dataframes, ignore_index=True) + r_df = r_df.assign(col0=pd.Categorical([partition_val] * len(r_df))) + + for i, df in enumerate(dataframes): + wr.s3.to_parquet( + df=df, + path=f"{s3_key}/part{i}.parquet", + ) + + result_df = wr.s3.read_parquet(path=s3_key, dataset=True) + pd.testing.assert_frame_equal(result_df, r_df, check_dtype=True) + + def test_s3_raise_delete_object_exception_success(moto_s3_client: "S3Client") -> None: path = "s3://bucket/test.parquet" wr.s3.to_parquet(df=get_df_list(), path=path, index=False, dataset=True, partition_cols=["par0", "par1"])