Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Spark reader fails to read hudi table created using 1.0.0-beta2 version #12068

Closed
dataproblems opened this issue Oct 7, 2024 · 15 comments
Labels
configs priority:critical production down; pipelines stalled; Need help asap. release-1.0.0-beta2 spark Issues related to spark

Comments

@dataproblems
Copy link

Describe the problem you faced

I'm creating a hudi table using bulk insert operation and the reader of the table fails with IllegalStateException.

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Read the data using spark.read.parquet("...")
  2. Write the table using data.write.format("hudi").options("...").save("...")
  3. Read the data again using spark.read.format("hudi").load("...")

Expected behavior

I should be able to read the data back into a dataframe with no exceptions.

Environment Description

  • Hudi version : 1.0.0-beta2

  • Spark version : 3.3.2

  • Hive version :

  • Hadoop version : 3.3.3

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

Here are the hudi options I'm using for the bulk insert:

val BulkWriteOptions: Map[String, String] = Map(
    DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, 
    HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy", 
    HoodieStorageConfig.PARQUET_MAX_FILE_SIZE
      .key() -> "2147483648", 
    "hoodie.parquet.small.file.limit" -> "1073741824",
    HoodieTableConfig.POPULATE_META_FIELDS.key() -> "false", 
    HoodieWriteConfig.BULK_INSERT_SORT_MODE
      .key() -> BulkInsertSortMode.GLOBAL_SORT
      .name(), 
    HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true", /
    HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX", 
    DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
    "hoodie.metadata.record.index.enable" -> "true", 
    "hoodie.metadata.enable" -> "true", 
    "hoodie.datasource.write.hive_style_partitioning" -> "true", 
    "hoodie.clustering.inline" -> "true", 
    "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
    "hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824"
    // Also using precombine key, partition path, table name, and key gen class. 
  )

Here's the hoodie.properties from the table that was generated using 1.0.0-beta2

#Updated at 2024-10-05T03:50:57.280Z
#Sat Oct 05 03:50:57 UTC 2024
hoodie.table.timeline.timezone=LOCAL
hoodie.table.precombine.field=<some timestamp field>
hoodie.table.version=8
hoodie.database.name=
hoodie.table.initial.version=8
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.metadata.partitions.inflight=
hoodie.table.checksum=4125900621
hoodie.table.keygenerator.type=COMPLEX
hoodie.partition.metafile.use.base.format=false
hoodie.archivelog.folder=archived
hoodie.table.cdc.enabled=false
hoodie.table.name=<some table name>
hoodie.populate.meta.fields=false
hoodie.table.type=COPY_ON_WRITE
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=column_stats,files,record_index
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=<some number of fields from the row>
hoodie.table.partition.fields=<a single field>

Stacktrace

24/10/07 19:40:46 INFO S3NativeFileSystem: Opening 's3://some-bucket-path/.hoodie/hoodie.properties' for reading
Exception in thread "main" java.lang.IllegalStateException
	at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
	at org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField$lzycompute(HoodieHadoopFsRelationFactory.scala:146)
	at org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField(HoodieHadoopFsRelationFactory.scala:141)
	at org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:224)
	at org.apache.hudi.HoodieCopyOnWriteSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:300)
	at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:295)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:135)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
	at <Some source code line>
	at <Some source code line>
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
@danny0405
Copy link
Contributor

There is a check when populateMetaFields is disabled. And I see you table config option is set up as hoodie.populate.meta.fields=false.

if (tableConfig.populateMetaFields()) {
      HoodieRecord.RECORD_KEY_METADATA_FIELD
    } else {
      val keyFields = tableConfig.getRecordKeyFields.get()
      checkState(keyFields.length == 1)
      keyFields.head
    }

@danny0405 danny0405 added configs spark Issues related to spark labels Oct 8, 2024
@dataproblems
Copy link
Author

@danny0405 - are you saying that I need to set hoodie.populate.meta.fields=true during the bulk insert operation?

@danny0405
Copy link
Contributor

@danny0405 - are you saying that I need to set hoodie.populate.meta.fields=true during the bulk insert operation?

yes, if your primary key fields are multiple.

@dataproblems
Copy link
Author

dataproblems commented Oct 8, 2024

@danny0405 - When I enabled that for the table that had multiple fields in the record key, I notice that the bulk insert operation is taking an unreasonably long time. Something that took 10 ish minutes before this change ran for over 2 hours and failed with the following exception:

java.io.EOFException: Unexpected EOF while trying to read response from server
	at org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:538) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1137) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
24/10/08 19:19:15 WARN DataStreamer: DataStreamer Exception
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:1.8.0_422]
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[?:1.8.0_422]
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[?:1.8.0_422]
	at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[?:1.8.0_422]
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) ~[?:1.8.0_422]
	at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:62) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:141) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:158) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:116) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_422]
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_422]
	at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[?:1.8.0_422]
	at org.apache.hadoop.hdfs.DataStreamer.sendPacket(DataStreamer.java:858) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.hdfs.DataStreamer.sendHeartbeat(DataStreamer.java:876) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:675) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
24/10/08 19:23:19 ERROR TransportRequestHandler: Error sending result RpcResponse[requestId=8906630334403057573,body=NioManagedBuffer[buf=java.nio.HeapByteBuffer[pos=0 lim=81 cap=156]]] to /10.0.26.16:52206; closing connection
java.io.IOException: Broken pipe

was this expected? I'm not operating with a lot of data for this test, but if the bulk insert operation takes exponentially higher time with this config, it would not be something that we can use.

@danny0405
Copy link
Contributor

It looks like a Hadoop error, is there any cues related to Hudi specifically? The non-metadata field write should be faster but should not be that long, 2 ~ 3x performance gap is expected there.

@dataproblems
Copy link
Author

No - given that it executes for over 2 hours, I would assume that it is stemming from something within hudi. I see this ERROR AppendDataExec: Data source write support org.apache.hudi.spark3.internal.HoodieDataSourceInternalBatchWrite@16d3b8c5 aborted.

@rangareddy
Copy link

Hi @dataproblems

Could you please share your spark-submit/pyspark command here. I can see you have mentioned Spark version is 3.3 and in the above error mentioned it is pointed spark3.

@rangareddy
Copy link

I have tested the following sample code and worked without any issues.

Cluster Details:

  • Spark 3.5
  • Hudi 1.0.0-beta2
spark-shell \
	--jars packaging/hudi-spark-bundle/target/hudi-spark3.5-bundle_2.12-1.0.0-beta2.jar \
	--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
	--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
	--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
	--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
	--conf spark.ui.port=14040
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import spark.implicits._

val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
    (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
    (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
    (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

var inserts = spark.createDataFrame(data).toDF(columns:_*)

val tableName = "trips_table"
val basePath = "file:///tmp/trips_table"

val bulkWriteOptions: Map[String, String] = Map(
    DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, 
    HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy", 
    HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648", 
    "hoodie.parquet.small.file.limit" -> "1073741824",
    HoodieTableConfig.POPULATE_META_FIELDS.key() -> "false", 
    HoodieWriteConfig.BULK_INSERT_SORT_MODE.key() -> BulkInsertSortMode.GLOBAL_SORT.name(), 
    HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
    HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX", 
    DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
    "hoodie.metadata.record.index.enable" -> "true", 
    "hoodie.metadata.enable" -> "true", 
    "hoodie.datasource.write.hive_style_partitioning" -> "true", 
    "hoodie.clustering.inline" -> "true", 
    "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
    "hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824",
	"hoodie.datasource.write.partitionpath.field" -> "city",
	"hoodie.datasource.write.recordkey.field" -> "uuid",
	"hoodie.datasource.write.precombine.field" -> "ts",
	"hoodie.table.name" -> tableName
)

inserts.write.format("hudi").
  options(bulkWriteOptions).
  mode(Overwrite).
  save(basePath)

val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.show(false)

@dataproblems
Copy link
Author

dataproblems commented Oct 10, 2024

Hi @rangareddy. The packages I use are --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:1.0.0-beta1,org.apache.hudi:hudi-aws:1.0.0-beta1. Another thing to note is that my recordKey is complex. I'm not sure if that impacts the change.

Are you suggesting that 1.0.0-beta2 should only be used with spark 3.5? ( For managing other dependencies, we're using EMR 6.x which does not have spark 3.5 and supports up till spark 3.4).

I also tried to use the 3.4 spark bundle and got the same exact error while trying to read the table. So it's reproducible on my end.

@rangareddy
Copy link

Hi @dataproblems

I have tested once again with Spark 3.4 and Hudi 1.0.0-beta, and it has worked. Could you please try the above example code (downloading/building spark3.4) and see if you are able to replicate the issue.

I have attached the logs for your reference.

spark_3.4_hudi_1_0_0_beta1.log

@dataproblems
Copy link
Author

Hi @rangareddy - Sure. I will also try that with the code you attached. However, I made a change to it since I'm using ComplexKeyGenerator with multiple fields in the record key.

Here's how I start my spark-shell

For EMR 6.11.0:

sudo spark-shell --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:1.0.0-beta2,org.apache.hudi:hudi-aws:1.0.0-beta2 --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar

For EMR 6.15.0

sudo spark-shell --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:1.0.0-beta2,org.apache.hudi:hudi-aws:1.0.0-beta2 --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar

Here's the full code I used:


import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import spark.implicits._
import org.apache.hudi.keygen.ComplexKeyGenerator


val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
  Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
    (1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
    (1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
    (1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"    ),
    (1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));

var inserts = spark.createDataFrame(data).toDF(columns:_*)

val tableName = "trips_table"
val basePath = "s3://somebucket/path/trips_with_3.3"

val bulkWriteOptions: Map[String, String] = Map(
  DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
  DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
  HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
  HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
  "hoodie.parquet.small.file.limit" -> "1073741824",
  HoodieTableConfig.POPULATE_META_FIELDS.key() -> "false",
  HoodieWriteConfig.BULK_INSERT_SORT_MODE.key() -> BulkInsertSortMode.GLOBAL_SORT.name(),
  HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
  HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
  DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
  "hoodie.metadata.record.index.enable" -> "true",
  "hoodie.metadata.enable" -> "true",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.clustering.inline" -> "true",
  "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
  "hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824",
  "hoodie.datasource.write.partitionpath.field" -> "city",
  "hoodie.datasource.write.recordkey.field" -> "driver,rider",
  "hoodie.datasource.write.precombine.field" -> "ts",
  "hoodie.table.name" -> tableName,
  DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getName
)

inserts.write.format("hudi").
  options(bulkWriteOptions).
  mode(Overwrite).
  save(basePath)

val tripsDF = spark.read.format("hudi").load(basePath)
tripsDF.show(false)

here's the logs with Spark 3.3. on EMR 6.11.0:

4/10/15 18:14:49 WARN SparkConf: The configuration key 'spark.yarn.driver.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.driver.memoryOverhead' instead.
java.lang.IllegalStateException
  at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
  at org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField$lzycompute(HoodieHadoopFsRelationFactory.scala:146)
  at org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField(HoodieHadoopFsRelationFactory.scala:141)
  at org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:224)
  at org.apache.hudi.HoodieCopyOnWriteSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:300)
  at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:295)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:135)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
  ... 55 elided

Then I used EMR 6.15.0 with spark 3.4 and got the same exception:

ava.lang.IllegalStateException
  at org.apache.hudi.common.util.ValidationUtils.checkState(ValidationUtils.java:62)
  at org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField$lzycompute(HoodieHadoopFsRelationFactory.scala:146)
  at org.apache.hudi.HoodieBaseHadoopFsRelationFactory.recordKeyField(HoodieHadoopFsRelationFactory.scala:141)
  at org.apache.hudi.HoodieMergeOnReadSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:224)
  at org.apache.hudi.HoodieCopyOnWriteSnapshotHadoopFsRelationFactory.<init>(HoodieHadoopFsRelationFactory.scala:300)
  at org.apache.hudi.DefaultSource$.createRelation(DefaultSource.scala:295)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:135)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:80)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
  ... 47 elided

I hope this helps clarify on the issue that I'm facing in my environment. However, I would also like to add that if I switch to using SimpleKeyGenerator and make the record key as uuid, as you had. That does work.

Is your recommendation that we should use SimpleKeyGenerator and a single field in the record key since there is a bug in the ComplexKeyGenerator / record key with multiple fields?

@ad1happy2go ad1happy2go added priority:critical production down; pipelines stalled; Need help asap. release-1.0.0-beta2 labels Oct 16, 2024
@rangareddy
Copy link

Hi @dataproblems

After investigating the issue, I discovered that the problem lies in the configuration setting HoodieTableConfig.POPULATE_META_FIELDS.key() being set to "false". This setting is only compatible with the SimpleKeyGenerator.

Solution:

HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true"

The following article will help you understand what virtual keys are and when to set the HoodieTableConfig.POPULATE_META_FIELDS.key() parameter value to false or true.

https://medium.com/@simpsons/virtual-keys-with-apache-hudi-848020ee377d

Note: Go through the "Why Virtual keys?" section.

@ad1happy2go
Copy link
Collaborator

@dataproblems Are we all good on this, can this be closed?

@ad1happy2go
Copy link
Collaborator

@dataproblems As discussed on slack closing this. We are already discussing the other issue here - #12116

@dataproblems
Copy link
Author

@ad1happy2go - yes! Since this was an issue with enabling the meta fields, it can be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
configs priority:critical production down; pipelines stalled; Need help asap. release-1.0.0-beta2 spark Issues related to spark
Projects
Status: Done
Development

No branches or pull requests

4 participants