Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]problem when inserting data to a non-partitioned table created by flink sql via spark sql cli #12013

Open
bithw1 opened this issue Sep 26, 2024 · 10 comments
Labels
flink Issues related to flink priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions

Comments

@bithw1
Copy link

bithw1 commented Sep 26, 2024

I am using Hudi 0.15.0 and Flink 1.17.1, following are the steps to reproduce the problem:

From the flink-sql cli: do the following sql statements

CREATE CATALOG hudi_catalog WITH (
    'type' = 'hudi',
    'mode' = 'hms',
    'default-database' = 'default',
    'hive.conf.dir' = '/home/hadoop/software/hive-3.1.3/conf', 
    'table.external' = 'true'
);

create database if not exists hudi_catalog.`default`;

use hudi_catalog.`default`;

CREATE TABLE test_hudi_flink_mor_2 (
  a int PRIMARY KEY NOT ENFORCED,
  b int,
  c int
  
)
WITH (
  'connector' = 'hudi',
  'path' = '/tmp/test_hudi_flink_mor_2',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',  --- Using ComplexAvroKeyGenerator
  'hoodie.datasource.write.recordkey.field' = 'a',
  'write.precombine.key'='b',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.conf.dir'='/home/hadoop/software/hive-3.1.3/conf'
);


insert into test_hudi_flink_mor_2 values (1,1,1),(2,2,2);

So far so good, then I run insert into test_hudi_flink_mor_2 select 3,3, 3 on the spark-sql cli, an error occurred, key exception message is as follows:


org.apache.hudi.exception.HoodieException: Config conflict(key  current value   existing value):
KeyGenerator:   org.apache.hudi.keygen.ComplexAvroKeyGenerator  org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator
        at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:229)
        at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232)
        at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)

When I run show create table test_hudi_flink_mor_2 on spark-sql cli, it shows that keygenerator is using ComplexAvroKeyGenerator,but when i look at the /tmp/test_hudi_flink_mor_2/.hoodie/hoodie.properties,
hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator, per the exception, it looks to me that there is a bug here...

@bithw1
Copy link
Author

bithw1 commented Sep 26, 2024

btw, i tried a little more, when I create the hudi table as a partitioned table, then i am able to insert from spark sql cli,
the problem should be related with non-partitioned table.

@bithw1 bithw1 changed the title [SUPPORT]Looks like a bug with keygenerator class when inserting data to a table (created by flink sql) via spark sql cli [SUPPORT]problem when inserting data to a non-partitioned table created by flink sql via spark sql cli Sep 26, 2024
@ad1happy2go ad1happy2go added priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions flink Issues related to flink labels Sep 26, 2024
@danny0405
Copy link
Contributor

The non-partitioned key generator is right, did you specify the key generator on Spark side?

@bithw1
Copy link
Author

bithw1 commented Sep 27, 2024

The non-partitioned key generator is right, did you specify the key generator on Spark side?

@danny0405 I don't think non-partitioned key generator is right here. When I am creating the hudi table using flink sql, I have explicitly specified the key generator as ComplexAvroKeyGenerator. But, it is saved in the hoodies.properties as hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator

From the spark side, I did nothing but insert one record insert into test_hudi_flink_mor_2 select 3,3, 3,

Basically, what I am doing is to create an hudi table from flink sql and use it from spark sql

@danny0405
Copy link
Contributor

yeah, we do have some set up logic in HoodieTableFactory and HoodieHiveCatalog, can you dig a little bit why the non-partitioned key generator is set up regardless of the explicit set up? I guess it is located in HoodieTableFactory.

@rangareddy
Copy link

Hi @bithw1

I am able to replicate this issue from spark side as well.

CREATE DATABASE IF NOT EXISTS spark_catalog.`default`;
USE spark_catalog.`default`;
DROP TABLE IF EXISTS hudi_table;

CREATE TABLE hudi_table (a int, b int, c int) USING HUDI TBLPROPERTIES (type = 'mor', primaryKey = 'a', preCombineField = 'b', hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator') PARTITIONED BY (c);

Exception:

spark-sql (hudi)> insert into hudi_table values (1,1,1),(2,2,2);
06:27:36.949 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver - Failed in [insert into hudi_table values (1,1,1),(2,2,2)]
org.apache.hudi.exception.HoodieException: Config conflict(key	current value	existing value):
KeyGenerator:	org.apache.hudi.keygen.ComplexAvroKeyGenerator	org.apache.hudi.keygen.SimpleKeyGenerator
	at org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:229) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:232) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:100) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:61) ~[hudi-spark3.5-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) ~[spark-sql_2.12-3.5.1.jar:3.5.1]
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) ~[spark-sql_2.12-3.5.1.jar:3.5.1]
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) ~[spark-sql_2.12-3.5.1.jar:3.5.1]

@rangareddy
Copy link

After checking the .hoodie/hoodie.properties file, we found that the hoodie.table.keygenerator.class property is set to org.apache.hudi.keygen.SimpleKeyGenerator by default, and it cannot be changed at runtime. To resolve this issue, we need to update the keygenerator class to org.apache.hudi.keygen.ComplexAvroKeyGenerator.

hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator
DROP TABLE IF EXISTS hudi_table;
CREATE TABLE hudi_table (a int, b int, c int) USING HUDI TBLPROPERTIES (type = 'mor', primaryKey = 'a', preCombineField = 'b', hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', hoodie.table.keygenerator.class='org.apache.hudi.keygen.ComplexAvroKeyGenerator') PARTITIONED BY (c);
insert into hudi_table values (1,1,1),(2,2,2);

select * from hudi_table;
20241008063231261	20241008063231261_1_0	2	c=2	d685c02a-c97f-4147-a5b7-acb476e3e6c6-0_1-64-77_20241008063231261.parquet	2	2	2
20241008063231261	20241008063231261_0_0	1	c=1	85b3ae97-01a4-4b6c-840a-00a5f783f410-0_0-64-76_20241008063231261.parquet	1	1	1

@bithw1
Copy link
Author

bithw1 commented Oct 8, 2024

Thanks @rangareddy for the clarification and clue. Per your guide, I modify my test case by adding hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator. to the table creation ddl (run by flink sql), but when I looked at the hoodie.properties, flink doesn't honor this configuration at all, it is still using hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator

This problem is one of the evidences that hudi configurations are so confusing and redundant. Also,it looks to me that spark and flink are using totally different configurations for almost the same thing, don't they belong to the same hudi project?

@bithw1
Copy link
Author

bithw1 commented Oct 8, 2024

After checking the .hoodie/hoodie.properties file, we found that the hoodie.table.keygenerator.class property is set to org.apache.hudi.keygen.SimpleKeyGenerator by default, and it cannot be changed at runtime. To resolve this issue, we need to update the keygenerator class to org.apache.hudi.keygen.ComplexAvroKeyGenerator.

hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexAvroKeyGenerator
DROP TABLE IF EXISTS hudi_table;
CREATE TABLE hudi_table (a int, b int, c int) USING HUDI TBLPROPERTIES (type = 'mor', primaryKey = 'a', preCombineField = 'b', hoodie.datasource.write.keygenerator.class = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator', hoodie.table.keygenerator.class='org.apache.hudi.keygen.ComplexAvroKeyGenerator') PARTITIONED BY (c);
insert into hudi_table values (1,1,1),(2,2,2);

select * from hudi_table;
20241008063231261	20241008063231261_1_0	2	c=2	d685c02a-c97f-4147-a5b7-acb476e3e6c6-0_1-64-77_20241008063231261.parquet	2	2	2
20241008063231261	20241008063231261_0_0	1	c=1	85b3ae97-01a4-4b6c-840a-00a5f783f410-0_0-64-76_20241008063231261.parquet	1	1	1

I think there is still bug there. I am not sure whether hoodie.datasource.write.keygenerator.class and hoodie.table.keygenerator.class configures the same thing.. if hoodie.datasource.write.keygenerator.class has been specified explicitly by the end user, why hoodie.table.keygenerator.class (saved in the hoodie.properties) doesn't honor this user configured property(hoodie.datasource.write.keygenerator.class)

@rangareddy
Copy link

rangareddy commented Oct 8, 2024

Hi @bithw1

There is an existing open issue HUDI-5262 and a corresponding pull request PR #7394 that has not been merged yet.

Till PR merges, you need to use hoodie.table.keygenerator.class to set the keygenerator when creating a table in spark-sql

#7351 (comment)

Please let me know if anything is required.

@ad1happy2go
Copy link
Collaborator

@jonvex Any insights on this ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues related to flink priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions
Projects
Status: 🏁 Triaged
Development

No branches or pull requests

4 participants