Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Issue with Hudi Parquet Compression Codec Not Working as Expected #12169

Open
soumilshah1995 opened this issue Oct 26, 2024 · 2 comments
Labels
writer-core Issues relating to core transactions/write actions

Comments

@soumilshah1995
Copy link

I'm experiencing an issue with the Hudi configuration for the parquet compression codec. Despite setting the option "hoodie.parquet.compression.codec": "GZIP" in my Hudi write options, the output files in my data lake are not showing as compressed files. Instead, I only see the standard Parquet files.
Configuration:
Hudi Version: 1.0.0-beta2
Spark Version: 3.4
Java Version: OpenJDK 11


hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.datasource.write.partitionpath.field": partition_fields,
    "hoodie.parquet.compression.codec": "GZIP",
    "parquet.compression.codec": "GZIP"
}

Test Code

try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random 
    import pandas as pd  # Import Pandas library for pretty printing
    print("Imports loaded ")

except Exception as e:
    print("error", e)
HUDI_VERSION = '1.0.0-beta2'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

# Spark session
spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()
def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM',
                  curr_region='us-east-1'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.table.type': table_type,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.precombine.field': precombine,
    "hoodie.datasource.write.partitionpath.field": partition_fields,
    "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
    "hoodie.clustering.plan.strategy.small.file.limit": "629145600",
    "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
    "hoodie.clean.automatic": "true"
    , "hoodie.parquet.max.file.size": 512 * 1024 * 1024
    , "hoodie.parquet.small.file.limit": 104857600,
    "hoodie.parquet.compression.codec": "GZIP",
        "parquet.compression.codec":"GZIP"
    }
 

    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)

from pyspark.sql.types import StructType, StructField, StringType, LongType

    
schema = StructType([
    StructField("id", StringType(), True),
    StructField("message", StringType(), True)
])


# Loop to generate data and write to Hudi
for i in range(1, 5):  # Number of iterations
    print("Epoch ", str(i))
    # Generate epoch timestamp
    epoch_time = int(datetime.now().timestamp())

    # Create the data
    updated_data = [(str(i), "Batch : {} ".format(i))]

    # Create the DataFrame with the new data
    df = spark.createDataFrame(updated_data, schema)

    # Show the DataFrame with the updated "message" column

    # Write to Hudi
    write_to_hudi(
        spark_df=df,
        method="upsert",
        db_name="default",
        table_name="messages",
        recordkey="id",
        precombine="message"
    )
    

Output

image

@ad1happy2go
Copy link
Collaborator

@soumilshah1995 Did you used try to see the compression on this files?
They should be gzip. you can use parquet-tools or try like below -

import pyarrow.parquet as pq

# Specify the path to your Parquet file
parquet_file = 'your_file.parquet'

# Read the metadata
metadata = pq.read_metadata(parquet_file)

# Print overall metadata
print(metadata)

# Iterate through the row groups and print compression information
for i in range(metadata.num_row_groups):
    row_group_metadata = metadata.row_group(i)
    print(f'Row Group {i}:')
    for j in range(row_group_metadata.num_columns):
        column_metadata = row_group_metadata.column(j)
        print(f'  Column {j}:')
        print(f'    Compression: {column_metadata.compression}')

@soumilshah1995
Copy link
Author

let me try and update thread

@ad1happy2go ad1happy2go added the writer-core Issues relating to core transactions/write actions label Oct 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
writer-core Issues relating to core transactions/write actions
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

2 participants