Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] OOM errors while creating a table using Bulk Insert operation #12116

Open
dataproblems opened this issue Oct 16, 2024 · 24 comments
Open

Comments

@dataproblems
Copy link

Describe the problem you faced

I am unable to create a hudi table using the data that I have with POPULATE_META_FIELDS being enabled. I can create the table with POPULATE_META_FIELDS set to false.

To Reproduce

Steps to reproduce the behavior:

  1. val data = spark.read.parquet("...")
  2. data.write.format("hudi").options(HudiOptions).save("")

There are a total of 68 billion unique record keys and my total dataset is around 5TB.

Expected behavior

I should be able to create the table without any exceptions

Environment Description

  • Hudi version : 0.15.0, 1.0.0-beta1, 1.0.0-beta2

  • Spark version : 3.3, 3.4

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

Spark submit command:

spark-submit --master yarn --deploy-mode client --conf "spark.driver.extraJavaOptions=-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" --conf "spark.executor.extraJavaOptions=-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof" --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.15.0,org.apache.hudi:hudi-aws:0.15.0 --class someClassName--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar --conf spark.executor.heartbeatInterval=900s --conf spark.network.timeout=1000s --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.driver.maxResultSize=0 someJarFile

Hudi Bulk Insert Options:

I've tried GLOBAL, PARTITION_SORT, and NONE => all result in the same error.

val BulkWriteOptions: Map[String, String] = Map(
    DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, // Configuration for bulk insert
    DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, // Table type
    HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy", 
    HoodieStorageConfig.PARQUET_MAX_FILE_SIZE
      .key() -> "2147483648", 
    "hoodie.parquet.small.file.limit" -> "1073741824",
    HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true", 
    HoodieWriteConfig.BULK_INSERT_SORT_MODE
      .key() -> BulkInsertSortMode.NONE
      .name(), 
    HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true", 
    HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX", /
    "hoodie.metadata.record.index.enable" -> "true", 
    "hoodie.metadata.enable" -> "true", 
    "hoodie.datasource.write.hive_style_partitioning" -> "true", 
    "hoodie.clustering.inline" -> "true", 
    "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
    "hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824"
  )

Stacktrace

This is the piece / stage that fails:

[save at DatasetBulkInsertCommitActionExecutor.java:81](https://p-3bp2pob2ivree-shs.emrappui-prod.us-east-1.amazonaws.com/shs/history/application_1729032267977_0010/stages/stage/?id=19&attempt=0) +details

org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
org.apache.hudi.commit.DatasetBulkInsertCommitActionExecutor.doExecute(DatasetBulkInsertCommitActionExecutor.java:81)
org.apache.hudi.commit.BaseDatasetBulkInsertCommitActionExecutor.execute(BaseDatasetBulkInsertCommitActionExecutor.java:101)
org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow(HoodieSparkSqlWriter.scala:924)
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:466)
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:104)
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)

The executors have these errors:

24/10/16 20:13:38 ERROR BulkInsertDataInternalWriterHelper: Global error thrown while trying to write records in HoodieRowCreateHandle 
org.apache.hudi.exception.HoodieRemoteException: Failed to create marker file somePartition=PartitionName/some_parquet_file_name.parquet.marker.CREATE
Connect to ip-10-0-160-126.ec2.internal:45651 [ip-10-0-160-126.ec2.internal/10.0.160.126] failed: Connection timed out (Connection timed out)
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeCreateMarkerRequest(TimelineServerBasedWriteMarkers.java:201) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.create(TimelineServerBasedWriteMarkers.java:157) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.marker.WriteMarkers.create(WriteMarkers.java:67) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.io.storage.row.HoodieRowCreateHandle.createMarkerFile(HoodieRowCreateHandle.java:281) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.io.storage.row.HoodieRowCreateHandle.<init>(HoodieRowCreateHandle.java:144) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.createHandle(BulkInsertDataInternalWriterHelper.java:217) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.getRowCreateHandle(BulkInsertDataInternalWriterHelper.java:203) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.write(BulkInsertDataInternalWriterHelper.java:125) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.spark3.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:62) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.spark3.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:38) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: org.apache.hudi.org.apache.http.conn.HttpHostConnectException: Connect to ip-10-0-160-126.ec2.internal:45651 [ip-10-0-160-126.ec2.internal/10.0.160.126] failed: Connection timed out (Connection timed out)
	at org.apache.hudi.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:151) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:353) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:380) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.client.fluent.Request.execute(Request.java:151) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeRequestToTimelineServer(TimelineServerBasedWriteMarkers.java:247) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeCreateMarkerRequest(TimelineServerBasedWriteMarkers.java:198) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	... 21 more
Caused by: java.net.ConnectException: Connection timed out (Connection timed out)
	at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_422]
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_422]
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_422]
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_422]
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_422]
	at java.net.Socket.connect(Socket.java:607) ~[?:1.8.0_422]
	at org.apache.hudi.org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:74) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:134) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:353) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:380) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:107) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.org.apache.http.client.fluent.Request.execute(Request.java:151) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeRequestToTimelineServer(TimelineServerBasedWriteMarkers.java:247) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.marker.TimelineServerBasedWriteMarkers.executeCreateMarkerRequest(TimelineServerBasedWriteMarkers.java:198) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	... 21 more

I see exit code 137 in the driver logs, OOM: Java Heap Space in the stdout logs.

@dataproblems
Copy link
Author

After reading this issue, I tried adding the following configuration to my hudi options:

"hoodie.write.markers.type" -> "DIRECT",
"hoodie.embed.timeline.server" -> "false"

But that failed as well, with this exception in the executors:

java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_422]
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_422]
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_422]
	at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_422]
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_422]
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
24/10/16 22:18:19 ERROR TransportResponseHandler: Still have 8 requests outstanding when connection from ip-10-0-175-8.ec2.internal/10.0.175.8:39085 is closed
24/10/16 22:18:19 WARN BlockManager: Putting block taskresult_3875 failed due to exception org.apache.spark.SparkException: Exception thrown in awaitResult: .
24/10/16 22:18:19 WARN BlockManager: Putting block taskresult_3876 failed due to exception org.apache.spark.SparkException: Exception thrown in awaitResult: .
24/10/16 22:18:19 ERROR Utils: Aborting task
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.scheduler.OutputCommitCoordinator.canCommit(OutputCommitCoordinator.scala:104) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:449) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_422]
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_422]
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_422]
	at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_422]
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_422]
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:258) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.74.Final.jar:4.1.74.Final]
	... 1 more

@ad1happy2go
Copy link
Collaborator

ad1happy2go commented Oct 17, 2024

@dataproblems Looks like the issue is with timeline server. Can you disable timeline server based markers -

Set hoodie.write.markers.type to DIRECT and try once.

If this also doesn't work then you can try disabling the timeline server itself.
hoodie.embed.timeline.server to false

Can you please give more information about your environment. Just wanted to understand the issue with timeline server.

@dataproblems
Copy link
Author

@ad1happy2go - Did you mean hoodie.write.markers.type to DIRECT? I see only two values for the hoodie.write.markers.type which is TIMELINE_SERVER_BASED and DIRECT. I also tried setting hoodie.embed.timeline.server to false. See this comment.

I'm running this on an EMR release 6.11.0. What other information ( other than the spark version / spark submit command ) would you need?

@ad1happy2go
Copy link
Collaborator

@dataproblems Thanks and sorry for overlooking that comment. Error in this comment #12116 (comment) is very generic. Do you see any other exception? Can you check the node health , are executor node going down due to OOM?

@ad1happy2go
Copy link
Collaborator

@dataproblems Can you share the entire driver logs what you ran without timeline server markers. By the exception its not clear which part of the code is getting broken.

@dataproblems
Copy link
Author

dataproblems commented Oct 17, 2024

@ad1happy2go - I see a couple of entries similar to:

ava.io.EOFException: Unexpected EOF while trying to read response from server
	at org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:538) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1137) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
24/10/17 16:24:27 WARN DataStreamer: Error Recovery for BP-2036047854-10.0.175.8-1729115502972:blk_1073745219_6368 in pipeline [DatanodeInfoWithStorage[10.0.165.76:9866,DS-5bdd769f-3dcf-4c53-8660-6a8bac6b1d0b,DISK], DatanodeInfoWithStorage[10.0.173.83:9866,DS-f05105bd-790b-4c8c-8767-2b059159b484,DISK], DatanodeInfoWithStorage[10.0.161.195:9866,DS-f4f690e7-6369-42b8-9c84-566d7dbd29f3,DISK]]: datanode 0(DatanodeInfoWithStorage[10.0.165.76:9866,DS-5bdd769f-3dcf-4c53-8660-6a8bac6b1d0b,DISK]) is bad.

in the driver logs. Does that give you an idea? ( I'm not sure if I can share the entire log file. )

I also see:

org.apache.hudi.exception.HoodieIOException: Failed to create file s3://redacted_table_path/redacted_parition/.hoodie_partition_metadata
	at org.apache.hudi.storage.HoodieStorage.createImmutableFileInPath(HoodieStorage.java:326) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.common.model.HoodiePartitionMetadata.lambda$trySave$19fcee3a$1(HoodiePartitionMetadata.java:115) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:85) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.common.util.RetryHelper.start(RetryHelper.java:113) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.common.model.HoodiePartitionMetadata.trySave(HoodiePartitionMetadata.java:121) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.io.storage.row.HoodieRowCreateHandle.<init>(HoodieRowCreateHandle.java:142) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.createHandle(BulkInsertDataInternalWriterHelper.java:217) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.getRowCreateHandle(BulkInsertDataInternalWriterHelper.java:203) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper.write(BulkInsertDataInternalWriterHelper.java:125) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.spark3.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:62) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.spark3.internal.HoodieBulkInsertDataInternalWriter.write(HoodieBulkInsertDataInternalWriter.java:38) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1550) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) ~[spark-sql_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.2-amzn-0.jar:3.3.2-amzn-0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://redacted_table_path/redacted_partition/.hoodie_partition_metadata
	at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
	at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.plan(RegularUploadPlanner.java:30) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
	at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:342) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1202) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1182) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1071) ~[hadoop-client-api-3.3.3-amzn-3.jar:?]
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:207) ~[emrfs-hadoop-assembly-2.56.0.jar:?]
	at org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem.lambda$create$2(HoodieWrapperFileSystem.java:242) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:118) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem.create(HoodieWrapperFileSystem.java:241) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.storage.hadoop.HoodieHadoopStorage.create(HoodieHadoopStorage.java:128) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	at org.apache.hudi.storage.HoodieStorage.createImmutableFileInPath(HoodieStorage.java:321) ~[org.apache.hudi_hudi-spark3.3-bundle_2.12-0.15.0.jar:0.15.0]
	... 22 more

this exception in one of the executor logs that I spot checked.

@rangareddy
Copy link

rangareddy commented Oct 18, 2024

Hi @dataproblems,

Additionally, you mentioned that an OOM error is occurring. After setting the hoodie.write.markers.type to DIRECT, are you still experiencing the OOM issue?

@dataproblems
Copy link
Author

dataproblems commented Oct 18, 2024

Hi @rangareddy - I still see the exit code 137 in the driver log, I only checked a few of the executor logs and pasted the exception in my previous comment. This is a single writer creating the base table.

I also tried creating the base table with less than 1% of my data ( something around 100 GB ) and the job just gets stuck here ( see attached screenshot ). I can see that the data files are there in S3 but the commit file isn't there yet. I'm also not sure what hoodie is doing in the stage.
ss.pdf

@ad1happy2go
Copy link
Collaborator

@dataproblems I noticed that only 400 tasks are getting created. This may be the main problem as tasks are taking more than 18 mins already. Can you find out the reason for this. Can you find out why its only created 400 tasks?

@ad1happy2go
Copy link
Collaborator

Whats the nature of your input data? somehow looks like your input dataframe only creating 400 partition. Can you try repartition before saving to see if it works

@dataproblems
Copy link
Author

dataproblems commented Oct 21, 2024

@ad1happy2go - That's likely because I'm setting the bulk insert shuffle parallelism to 400, I've tried with other values but all result in similar outcomes. Can you elaborate on the nature of the data question? Here's another screenshot where the partition / parallelism could was higher.

Spark UI with higher parallelism count

@ad1happy2go
Copy link
Collaborator

@dataproblems Can you remove this config and try, After Hudi 0.14.x we auto calculate the number of parallelism required.

On above screenshot can you expand Failure reason and send stacktrace?

@dataproblems
Copy link
Author

dataproblems commented Oct 21, 2024

@ad1happy2go - I've removed that config for the second screenshot. I couldn't find the executor with the exact error message as shown in the screen shot but here's something that I do see:

4/10/21 04:15:25 INFO ShuffleBlockFetcherIterator: Started 40 remote fetches in 1160 ms
24/10/21 04:15:25 INFO ShuffleBlockFetcherIterator: Started 42 remote fetches in 1160 ms
24/10/21 04:15:25 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException: null
	at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:267) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_doSort_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hasNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:464) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
24/10/21 04:15:25 ERROR DataWritingSparkTask: Aborting commit for partition 4117 (task 20594, attempt 0, stage 23.0)
24/10/21 04:15:25 ERROR DataWritingSparkTask: Aborted commit for partition 4117 (task 20594, attempt 0, stage 23.0)
24/10/21 04:15:25 ERROR Utils: Aborting task
org.apache.spark.TaskKilledException: null
	at org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:267) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_addToSorter_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.sort_doSort_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.hasNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:959) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:464) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:509) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
24/10/21 04:15:25 ERROR DataWritingSparkTask: Aborting commit for partition 4116 (task 20593, attempt 0, stage 23.0)
24/10/21 04:15:25 ERROR DataWritingSparkTask: Aborted commit for partition 4116 (task 20593, attempt 0, stage 23.0)
24/10/21 04:15:25 INFO Executor: Executor killed task 4117.0 in stage 23.0 (TID 20594), reason: Stage cancelled
24/10/21 04:15:25 INFO Executor: Executor killed task 4116.0 in stage 23.0 (TID 20593), reason: Stage cancelled
24/10/21 04:15:26 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM

Also for some reason the driver claims that the application was succeeded but the Spark UI as well as the data sink in s3 show that the data was never written completely:

24/10/21 04:37:15 INFO ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 1190.
24/10/21 04:37:17 INFO ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. ip-10-0-164-85.ec2.internal:46343
24/10/21 04:37:17 INFO ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. ip-10-0-164-85.ec2.internal:46343
24/10/21 04:37:17 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
24/10/21 04:37:17 INFO ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
24/10/21 04:37:17 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
24/10/21 04:37:18 INFO ApplicationMaster: Deleting staging directory hdfs://ip-10-0-164-85.ec2.internal:8020/user/hadoop/.sparkStaging/application_1729482915237_0001
24/10/21 04:37:18 INFO ShutdownHookManager: Shutdown hook called

The log entry driver terminated or disconnected is a little weird too.

@ad1happy2go
Copy link
Collaborator

@dataproblems This doesn't give much of insights. Possible to attach complete driver and one executor log?

@dataproblems
Copy link
Author

@ad1happy2go - Sure. Here you go. You will see the stack trace in the driver.log file.
driver.log
executor.log

@ad1happy2go
Copy link
Collaborator

@dataproblems Any reason why we are using such a high --conf spark.executor.heartbeatInterval=900s. It should be much lesser than spark.network.timeout. Can you try leaving these as defaults one. I see lot of issues with spark configs.

Shouldn't be using --conf spark.driver.maxResultSize=0 also, as then driver result collection will not have any limit. You may increase upto 4 gb if required but keep a check.

Also did you tried turning on the timeline server i.e. update only this "hoodie.embed.timeline.server" to "true" as this is used to build file system image. Keep markers as direct only.

@dataproblems
Copy link
Author

dataproblems commented Oct 22, 2024

@ad1happy2go - We were getting heartbeat timeout exceptions which resulted in the increased value for those configurations. Same with the driver maxResultSize, I got an exception about the driver result being larger than the maxResultSize of 5g that I was using previously - as such, I removed the limit to mitigate that error.

When I tried using DIRECT markers but set hoodie.embed.timeline.server to true. I still got a similar error ( see screenshot below ).

Error Screenshot

@ad1happy2go
Copy link
Collaborator

@dataproblems There is something wrong in your setup, if such a large data size is getting collected to driver. Is it possible to share the hudi timeline? whats the size of the commit files?

@dataproblems
Copy link
Author

dataproblems commented Oct 23, 2024

@ad1happy2go - Given that this is creating the table, there is only a single commit requested. Both the commit.requested and commit.inflight objects are 0 B in size. Since we never get to the .commit file as the job fails before writing all of the data.

On a separate note - when I do disable POPULATE_META_FIELDS ( and am using 0.15.0 ) the commit file for a small 100 GB sample of my complete dataset is around 40 MB. Do you think the process that's creating this commit file for my complete dataset is resulting in a failure? My complete dataset is several TB in size, as such the size of the .commit file will increase and maybe that's causing the issues that I'm facing?

The spark job is merely reading from S3 and writing the data back in hudi format on our end, there are no operations we perform which would result in the dataset being collected on the driver, so I would defer to you on that front - usually it's in the mapToPair operation in HoodieJavaRdd file or in the save operation as seen in the previous screenshots.

@ad1happy2go
Copy link
Collaborator

ad1happy2go commented Oct 24, 2024

@dataproblems There is some problem here. For a 100 GB data you should not have 40 MB commit file.
One reason that could be the possible is nature of your partition column.
How many partitions you have, i mean how many unique values are there in your dataset for your partition column

Another reason can be you have lot of small files in the input. if thats the case, Can you try repartition the dataframe before saving it?

@dataproblems
Copy link
Author

dataproblems commented Oct 24, 2024

@ad1happy2go, I have about 6 partitions for the sample dataset that I'm using.

Partition Number of unique values
One 12959311
Two 629845160
Three 458227144
Four 1107519580
Five 472111
Six 19391133

Experiment 1: Repartition before saving

Let me update you on how the repartition exercise goes and see if it results in a smaller size for the .commit file.
=> Update, for my sample data I tried an extreme case of repartition(..) but using numPartitions as the total number of unique partition cols and I was able to see that the commit file reduced from ~40 MB to ~50KB. Next I'm trying to enable the POPULATE_META_FIELDS flag along with the repartition to see if I can create the index with the sample data.

Experiment 2: Repartition with POPULATE_META_FIELDS set to true.

=> Update: I tried using .repartition with POPULATE_META_FIELDS set to true but haven't had luck with the completion of the job. I see that the data has been written to s3 but no .commit file.
Spark UI Populate Meta Fields true and repartition for Sample Data

Our main problem is that we're not able to use POPULATE_META_FIELDS to true and create the index. The job fails after writing partial data to s3 due to executor heartbeats / OOM issues. Do you think GC could be a culprit there?

Experiment 3: Create Hudi base tables using random data.

We also decided to give random data a try with hudi to see if there's a problem that we are running into due to the nature of our data or something else. Here's the script we used to generate the data and generate the hudi table:

How did we generate the random data?

Here's the code:

import java.util.UUID
import scala.util.Random
case class RandomData(id: Long, uuid: String, ts: Long = 28800000L, partition: String)

val partitions = List("One", "Two", "Three", "Four")

val randomData = spark.range(1, 10 * 10000000L).map(f => RandomData(id = f, uuid = UUID.randomUUID.toString, partition = Random.shuffle(partitions).head))

How did we generate the hudi table?

import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import spark.implicits._
import org.apache.hudi.keygen.SimpleKeyGenerator

val inserts = spark.read.parquet("$somePath")

val tableName = "random_table"
val basePath = "$someHudiPath"

val bulkWriteOptions: Map[String, String] = Map(
  DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
  DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
  HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
  HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
  "hoodie.parquet.small.file.limit" -> "1073741824",
  HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
  HoodieWriteConfig.BULK_INSERT_SORT_MODE.key() -> BulkInsertSortMode.GLOBAL_SORT.name(),
  HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
  HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
  DataSourceWriteOptions.META_SYNC_ENABLED.key() -> "false",
  "hoodie.metadata.record.index.enable" -> "true",
  "hoodie.metadata.enable" -> "true",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.clustering.inline" -> "true",
  "hoodie.clustering.plan.strategy.target.file.max.bytes" -> "2147483648",
  "hoodie.clustering.plan.strategy.small.file.limit" -> "1073741824",
  "hoodie.datasource.write.partitionpath.field" -> "partition",
  "hoodie.datasource.write.recordkey.field" -> "id",
  "hoodie.datasource.write.precombine.field" -> "ts",
  "hoodie.table.name" -> tableName,
  DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[SimpleKeyGenerator].getName,
  "hoodie.write.markers.type" -> "DIRECT",
  "hoodie.embed.timeline.server" -> "false"
)

inserts.repartition(100).write.format("hudi").
  options(bulkWriteOptions).
  mode(Overwrite).
  save(basePath)

Results

Data Size Commit Requested Commit Total Time
47 MB 00:03 00:40 37 seconds
400 MB 00:24 02:13 1 minute 49 seconds
800 MB 00:08 03:02 2 minutes 54 seconds
1.9 GB 00:38 07:03 6 minutes 25 seconds
3.8 GB 00:28 16:55 16 minutes 27 seconds
37 GB Didn't create the table ( still in running / stuck state after 88 minutes)

From my observations, the table creation time increases somewhat linearly at the start but then with 1.9 GB -> 3.8 GB it's non linear. Also, we were not able to get the table created for the dataset of 37 GB in size. Can you see if you can reproduce it on your end? It would be useful to learn about what parameter and configuration worked for you. I tried the random data example with and without the repartition - both times, I saw that partial data was written to S3 and then the job would go into the long pause / running state. See attached screenshot

Random Data 37 GB Table Creation Spark UI

@dataproblems
Copy link
Author

@ad1happy2go - did you get a chance to take a look at the results / data I posted last week?

@ad1happy2go
Copy link
Collaborator

@dataproblems
For the Exepriment 2, you can try increasing executor memory overhead. You cam also check the GC time under stages if that is a problem. I see you are already tuning your GC mentioned on this doc - https://hudi.apache.org/docs/tuning-guide/

For the Experiment 3 - i can clearly see the problem is there with parallelim. Its just creating 100 tasks and they are running from 1.6h. Can you try to increase the parallelism in this case. To do this you have to increase the repartition factor along with the dataset which is 100 in your case.

@dataproblems
Copy link
Author

dataproblems commented Oct 30, 2024

@ad1happy2go -

Follow up questions:

  • Are you saying that we need to go beyond 20% for executor memory overhead in experiment 2?
  • Were you able to get experiment 3 to work on your end with the 37GB size?

Update on Experiment 3:

Based on your suggestion, I updated the numPartitions from 100 to 1000 which should be sufficient for a dataset of size 37 GB. However, I see that the save at DatasetBulkInsertCommitActionExecutor.java:81 stage failed with the following exception:

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout

Stacktrace

Container id: container_1730319579539_0001_01_000355
Exit code: 56

[2024-10-30 20:51:25.811]Container exited with a non-zero exit code 56. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
uledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:314) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	... 13 more
24/10/30 20:51:15 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1116) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2122) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.Heartbeater$$anon$1.run(Heartbeater.scala:46) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_422]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_422]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_422]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:314) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
	... 13 more
24/10/30 20:51:15 ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times

see attached screenshot:
Experiment 3 with higher numPartitions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 👤 User Action
Development

No branches or pull requests

3 participants