Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Flink Hudi Job Configuration and Parameter Conflict Issues #12024

Open
usberkeley opened this issue Sep 30, 2024 · 11 comments
Open

[SUPPORT] Flink Hudi Job Configuration and Parameter Conflict Issues #12024

usberkeley opened this issue Sep 30, 2024 · 11 comments
Labels

Comments

@usberkeley
Copy link
Contributor

usberkeley commented Sep 30, 2024

Describe the problem you faced

  1. Configuration Conflict in Flink Hudi Job: When modifying the configuration of an existing Flink Hudi Job, if there is a conflict with the Table Config (hoodie.properties), the job does not throw an error. Instead, it silently overrides the user configuration with the previous Table Config settings.
  2. Parameter Conflict Handling: When certain parameters conflict, the job does not check for these conflicts at startup. Errors are only thrown during runtime, which delays problem detection.
  3. Flink SQL Keywords Conflict: When the Flink SQL keywords PRIMARY KEY and PARTITIONED BY conflict with user configurations hoodie.datasource.write.recordkey.field and hoodie.datasource.write.partitionpath.field, the job does not throw an error. Instead, it prioritizes the Flink SQL keywords over the Hoodie configurations.

To Reproduce

Steps to reproduce the behavior:

  1. Configuration Conflict in Flink Hudi Job

    1. Start a Flink Hudi Job and create hoodie.properties.
    2. Modify the Flink Hudi Table configuration and restart the job.
    3. Observe that the modifications do not take effect and the job starts normally without errors.
  2. Parameter Conflict Handling

    1. Configure the record key field with two fields: hoodie.datasource.write.recordkey.field = 'uuid,name'.
    2. Configure a mismatched key generator: hoodie.datasource.write.keygenerator.class='org.apache.hudi.keygen.SimpleAvroKeyGenerator'.
    3. The job will throw an error during runtime, indicating that the Avro Record cannot find the fields 'uuid,name'.
  3. Flink SQL Keywords Conflict

    1. Create a Flink table:
      CREATE TABLE t_test (
        `uuid` VARCHAR(20),
        `name` VARCHAR(10),
        `age` INT,
        `ts` TIMESTAMP(3),
        `partition` VARCHAR(10),
        PRIMARY KEY (uuid, name) NOT ENFORCED
      )
      PARTITIONED BY (`partition`)
      WITH (
        'hoodie.datasource.write.recordkey.field' = 'age',
        'hoodie.datasource.write.partitionpath.field' = 'name'
      );
    2. Observe that the job does not throw an error and internally prioritizes PRIMARY KEY and PARTITIONED BY over the Hoodie Config settings.

Expected behavior

Discussion item: I'd like to ask whether we should strictly check for configuration conflicts. Should we directly report an error in case of a conflict, rather than internally modifying user parameters?

I prefer directly reporting an error. I have a reason: if we don't report an error, users might mistakenly believe their configuration is valid, which could lead to confusion.

Environment Description

  • Hudi version : 0.15.0

  • Spark version : none

  • Hive version : none

  • Hadoop version : 3.3.5

  • Storage (HDFS/S3/GCS..) : HDFS

  • Running on Docker? (yes/no) : no

@rangareddy
Copy link

Hi @usberkeley

Thank you for reporting this issue. I was able to replicate the issue using Spark as well. Specifically, I created a table using Spark SQL, specifying primary columns as id and name. Then, I inserted data into the table using a DataFrame (df) by specifying primary columns as id and salary instead of id and name.

Spark Code:

package com.ranga

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

object Test12024 extends App {
  val name = this.getClass.getSimpleName.replace("$", "")
  val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

  val spark = SparkSession.builder.appName(name).config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .getOrCreate()

  spark.sql(
    """
      |CREATE TABLE IF NOT EXISTS t_test (
      |  `id` VARCHAR(20),
      |  `name` VARCHAR(10),
      |  `age` INT,
      |  `ts` Long
      |) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
      | LOCATION '/tmp/warehouse/t_test'
    """.stripMargin)

  val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("ts", LongType),
  ))

  val input_data = Seq(
    Row(1L, "hello", 42,  1695159649087L),
    Row(2L, "world", 13, 1695091554788L),
    Row(3L, "spark", 7, 1695115999911L),
    Row(1L, "hello", 43,  1695159649087L),
  )
  
  val tableName = name
  val basePath = f"file:///tmp/$tableName"
  val hoodieConf = scala.collection.mutable.Map[String, String]()
  hoodieConf.put("hoodie.datasource.write.recordkey.field", "id,age")
  hoodieConf.put("hoodie.table.precombine.field", "ts")
  hoodieConf.put("hoodie.table.name", tableName)

  val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)
  input_df.write.format("hudi").
    options(hoodieConf).
    mode("overwrite").
    save(basePath)

  spark.read.format("hudi").load(basePath).show(false)

  spark.stop()
}

@rangareddy
Copy link

Created upstream jira to track the issue:

https://issues.apache.org/jira/browse/HUDI-8278

@rangareddy
Copy link

I later tested the append mode as well and encountered the same issue.

@danny0405
Copy link
Contributor

PRIMARY KEY and PARTITIONED BY conflict with user configurations hoodie.datasource.write.recordkey.field and hoodie.datasource.write.partitionpath.field, the job does not throw an error. Instead, it prioritizes the Flink SQL keywords over the Hoodie configurations.

This is by-design.

@usberkeley
Copy link
Contributor Author

PRIMARY KEY and PARTITIONED BY conflict with user configurations hoodie.datasource.write.recordkey.field and hoodie.datasource.write.partitionpath.field, the job does not throw an error. Instead, it prioritizes the Flink SQL keywords over the Hoodie configurations.

This is by-design.

@danny0405
Thank you, I understand that the priority of the PRIMARY KEY is higher than that of the Hoodie Config. However, when there are two conflicting configurations, it can confuse the user.

In this case, should we directly throw an error? Inform the user of the conflict and ask them to correct the configuration.

@danny0405
Copy link
Contributor

In this case, should we directly throw an error? Inform the user of the conflict and ask them to correct the configuration

Maybe we just log some warnings there.

@rangareddy
Copy link

Tested the same code using Hive Sync. No issue is reported while writing and reading.

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

val name = "Hudi_Test"
val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

val spark = SparkSession.builder.appName(name).config(sparkConf)
	.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
	.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
	.config("spark.sql.hive.convertMetastoreParquet", "false")
	.getOrCreate()

spark.sql(
    """
      |CREATE TABLE IF NOT EXISTS t_test (
      |  `id` VARCHAR(20),
      |  `name` VARCHAR(10),
      |  `age` INT,
      |  `ts` Long
      |) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
      | LOCATION '/tmp/warehouse/t_test'
    """.stripMargin)

val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("ts", LongType),
))

val input_data = Seq(
	Row(1L, "hello", 42,  1695159649087L),
	Row(2L, "world", 13, 1695091554788L),
	Row(3L, "spark", 7, 1695115999911L),
	Row(1L, "hello", 43,  1695159649087L),
)

val basePath = f"file:///tmp/$tableName"
val tableName = name
val databaseName = "test"

val hoodieConf = Map(
	"hoodie.datasource.write.recordkey.field" -> "id,age",
	"hoodie.datasource.write.recordkey.field" -> "id,age",
	"hoodie.table.precombine.field" -> "ts",
	"hoodie.table.name" -> tableName,
	"hoodie.database.name" -> databaseName,
	"hoodie.datasource.meta.sync.enable" -> "true",
	"hoodie.datasource.hive_sync.enable" -> "true",
	"hoodie.datasource.hive_sync.table" -> tableName,
	"hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
	"hoodie.datasource.hive_sync.use_jdbc" -> "false",
	"hoodie.datasource.hive_sync.mode" -> "hms",
	"hoodie.datasource.write.hive_style_partitioning" -> "true"
)

val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)
input_df.write.format("hudi").options(hoodieConf).mode("overwrite").save(basePath)

spark.read.format("hudi").load(basePath).show(false)

@rangareddy
Copy link

Hi @usberkeley

I got the expected exception when we specify the same location while creating the table and saving the data.

Exception:

24/10/17 12:10:12 INFO HoodieTableConfig: Loading table properties from file:/tmp/hudi/Test12114/.hoodie/hoodie.properties
24/10/17 12:10:12 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from file:///tmp/hudi/Test12114
Exception in thread "main" org.apache.hudi.exception.HoodieException: Config conflict(key	current value	existing value):
RecordKey:	id,age	id,name
	at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:229)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232)
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)

Code:

import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}

object Test12024 extends App {
  val name = this.getClass.getSimpleName.replace("$", "")
  val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")

  val spark = SparkSession.builder.appName(name).config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .getOrCreate()

  val tableName = name
  val basePath = f"file:///tmp/hudi/$tableName"

  spark.sql(
    f"""
      |CREATE TABLE IF NOT EXISTS ${tableName} (
      |  `id` VARCHAR(20),
      |  `name` VARCHAR(10),
      |  `age` INT,
      |  `ts` Long
      |) USING HUDI TBLPROPERTIES (primaryKey = 'id,name', preCombineField = 'ts')
      | LOCATION '${basePath}'
    """.stripMargin)

  val input_schema = StructType(Seq(
    StructField("id", LongType),
    StructField("name", StringType),
    StructField("age", IntegerType),
    StructField("ts", LongType),
  ))

  val input_data = Seq(
    Row(1L, "hello", 42,  1695159649087L),
    Row(2L, "world", 13, 1695091554788L),
    Row(3L, "spark", 7, 1695115999911L),
    Row(1L, "hello", 43,  1695159649087L),
  )

  val hoodieConf = scala.collection.mutable.Map[String, String]()
  hoodieConf.put("hoodie.datasource.write.recordkey.field", "id,age")
  hoodieConf.put("hoodie.table.precombine.field", "ts")
  hoodieConf.put("hoodie.table.name", tableName)

  val input_df = spark.createDataFrame(spark.sparkContext.parallelize(input_data), input_schema)

  input_df.write.format("hudi").
    options(hoodieConf).
    mode("append").
    save(basePath)

  spark.read.format("hudi").load(basePath).show(false)
  
  spark.stop()
}

@rangareddy
Copy link

Hi @usberkeley

Please let me know is there any update?

@usberkeley
Copy link
Contributor Author

Hi @usberkeley

Please let me know is there any update?

@rangareddy Wow, that's great! From the code, it seems Spark has checks in place. Could you please help take a look at Flink as well?

@rangareddy
Copy link

Hi @usberkeley

Have you tried to replicate the issue from Flink?

#12024 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

4 participants