Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]The default behavior of spark sql insert into has changed from upsert to insert? #12168

Open
bithw1 opened this issue Oct 26, 2024 · 3 comments
Labels
feature-enquiry issue contains feature enquiries/requests or great improvement ideas spark-sql

Comments

@bithw1
Copy link

bithw1 commented Oct 26, 2024

I looked into the source code of Hudi 0.15, and find the option hoodie.sql.insert.mode is deprecated.

  @Deprecated
  val SQL_INSERT_MODE: ConfigProperty[String] = ConfigProperty
    .key("hoodie.sql.insert.mode")
    .defaultValue("upsert")
    .markAdvanced()
    .deprecatedAfter("0.14.0")
    .withDocumentation("Insert mode when insert data to pk-table. The optional modes are: upsert, strict and non-strict." +
      "For upsert mode, insert statement do the upsert operation for the pk-table which will update the duplicate record." +
      "For strict mode, insert statement will keep the primary key uniqueness constraint which do not allow duplicate record." +
      "While for non-strict mode, hudi just do the insert operation for the pk-table. This config is deprecated as of 0.14.0. Please use " +
      "hoodie.spark.sql.insert.into.operation and hoodie.datasource.insert.dup.policy as you see fit.")

Prior to hudi 0.14.0 ,the default behavior for spark sql insert into statement is doing upsert which will not introduce duplicates.In the doc ,it says that this option is replaced with two options: hoodie.spark.sql.insert.into.operation and hoodie.datasource.insert.dup.policy.

The definition for hoodie.spark.sql.insert.into.operation is as follows, the default value for the spark.sql.insert.into.operation has been changed to insert.

  val SPARK_SQL_INSERT_INTO_OPERATION: ConfigProperty[String] = ConfigProperty
    .key("hoodie.spark.sql.insert.into.operation")
    .defaultValue(WriteOperationType.INSERT.value())
    .withValidValues(WriteOperationType.BULK_INSERT.value(), WriteOperationType.INSERT.value(), WriteOperationType.UPSERT.value())
    .markAdvanced()
    .sinceVersion("0.14.0")
    .withDocumentation("Sql write operation to use with INSERT_INTO spark sql command. This comes with 3 possible values, bulk_insert, " +
      "insert and upsert. bulk_insert is generally meant for initial loads and is known to be performant compared to insert. But bulk_insert may not " +
      "do small file management. If you prefer hudi to automatically manage small files, then you can go with \"insert\". There is no precombine " +
      "(if there are duplicates within the same batch being ingested, same dups will be ingested) with bulk_insert and insert and there is no index " +
      "look up as well. If you may use INSERT_INTO for mutable dataset, then you may have to set this config value to \"upsert\". With upsert, you will " +
      "get both precombine and updates to existing records on storage is also honored. If not, you may see duplicates. ")

The definition for hoodie.datasource.insert.dup.policy is as follows, the default value is none

  val INSERT_DUP_POLICY: ConfigProperty[String] = ConfigProperty
    .key("hoodie.datasource.insert.dup.policy")
    .defaultValue(NONE_INSERT_DUP_POLICY)
    .withValidValues(NONE_INSERT_DUP_POLICY, DROP_INSERT_DUP_POLICY, FAIL_INSERT_DUP_POLICY)
    .markAdvanced()
    .sinceVersion("0.14.0")
    .withDocumentation("**Note** This is only applicable to Spark SQL writing.<br />When operation type is set to \"insert\", users can optionally enforce a dedup policy. This policy will be employed "
      + " when records being ingested already exists in storage. Default policy is none and no action will be taken. Another option is to choose " +
      " \"drop\", on which matching records from incoming will be dropped and the rest will be ingested. Third option is \"fail\" which will " +
      "fail the write operation when same records are re-ingested. In other words, a given record as deduced by the key generation policy " +
      "can be ingested only once to the target table of interest.")

With default value for the above two options, it means that the default behavior of spark sql insert into statement has been changed for upsert to insert(may introduce duplicates)?

I am not sure whether I have understood correctly. If i am correct, then this change is a broken change, some people using older version are using spark sql insert into to do upsert which will not introduce duplicates, but after upgrading to 0.14.0+, the default behavior is using insert which may introduce duplicates?

@danny0405
Copy link
Contributor

yeah, if there is no record key definition it will fall back to pk-less, whereas it's INSERT.

@bithw1
Copy link
Author

bithw1 commented Oct 28, 2024

yeah, if there is no record key definition it will fall back to pk-less, whereas it's INSERT.

Thanks @danny0405 for the reply, but I don't get what you said.

With the two default options: hoodie.datasource.insert.dup.policy=none and hoodie.spark.sql.insert.into.operation=insert,
what is the behavior if record key definitions have been defined, insert or upsert?

@danny0405
Copy link
Contributor

The operation is taken the highest priority if it is set up explicitly, so it is still INSERT instead of default upsert.

@ad1happy2go ad1happy2go added feature-enquiry issue contains feature enquiries/requests or great improvement ideas spark-sql labels Oct 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature-enquiry issue contains feature enquiries/requests or great improvement ideas spark-sql
Projects
Status: Awaiting Triage
Development

No branches or pull requests

3 participants