Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] HUDI upsert fails at distinct at MapPartitionRDD #12186

Open
IamJusKyrat opened this issue Oct 30, 2024 · 2 comments
Open

[SUPPORT] HUDI upsert fails at distinct at MapPartitionRDD #12186

IamJusKyrat opened this issue Oct 30, 2024 · 2 comments

Comments

@IamJusKyrat
Copy link

IamJusKyrat commented Oct 30, 2024

Describe the problem you faced
Running an ETL on a table "Aggregate X Details". When running this on a large dataset i.e ~30M rows of data we partition our dataset into 50 partitions using RDD.coalesce() command. But the executors are failing repeatedly at distinct at MapPartitionsRDD step,

HUDI Configuration:

  1. hoodie.datasource.write.operation: upsert
  2. hoodie.datasource.write.keygenerator.class: org.apache.hudi.keygen.TimestampBasedKeyGenerator
  3. hoodie.upsert.shuffle.parallelism: 2
  4. hoodie.datasource.write.partitionpath.field: timestamp (which is year truncated)
  5. hoodie.datasource.write.table.type: COPY_ON_WRITE
  6. hoodie.datasource.write.precombine.field: timestamp

Indexing: using the default index behavior
hoodie.index.type: simple

Spark Config:

spark.executor.instances=0; spark.executor.memory=32g; spark.driver.memory=4g; spark.driver.cores=4; spark.dynamicAllocation.initialExecutors=1; spark.dynamicAllocation.maxExecutors=5

Environment Description

  • Amazon EMR Version:

  • Hudi version : 0.14.0-amzn-0

  • Spark version : 3.4.1-amzn-2

  • Hive version : 3.1.3

  • Hadoop version : 3.3.6-amzn-1

  • Storage (HDFS/S3/GCS..) : S3 (But spark uses HDFS for processing tasks in EMR serverless)

  • Running on Docker? (yes/no) : No

Stacktrace

When running this on a large dataset i.e ~30M rows of data we partition our dataset into 50 partitions using RDD.coalesce() command. But the executors are failing repeatedly at :

org.apache.spark.api.java.JavaRDD.distinct(JavaRDD.scala:85) org.apache.hudi.data.HoodieJavaRDD.distinct(HoodieJavaRDD.java:157) org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:147) org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:118) org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91) org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:59) org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:41) org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:63) org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor.execute(SparkUpsertCommitActionExecutor.java:44) org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:114) org.apache.hudi.table.HoodieSparkCopyOnWriteTable.upsert(HoodieSparkCopyOnWriteTable.java:103) org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:142) org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224) org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:431) org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132) org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150) org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)

image

Losing shuffle mapped locations:

`MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to 2600:1f13:d89:d701:c1b3:f3b3:161c:a99e:41804
24/10/23 10:31:07 WARN TaskSetManager: Lost task 1.1 in stage 4.0 (TID 7) ([2600:1f13:d89:d701:c1b3:f3b3:161c:a99e] executor 2): FetchFailed(null, shuffleId=1, mapIndex=-1, mapId=-1, reduceId=1, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 partition 1
at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1732)
at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$11(MapOutputTracker.scala:1679)
at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$11$adapted(MapOutputTracker.scala:1678)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1678)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1320)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:1282)
at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:140)
at org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
at org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:377)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1552)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:563)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:566)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

)`

Expected behavior

Should be able to parallelize the distinct operation to allow Upsert to de-duplicate records operations for these data rows.

@danny0405
Copy link
Contributor

Thanks for the feedback, maybe parallelising the distinct of simple index is a straight-forward solution, we can execute the distinct with 2 or more stages(introduing some reshuffle there), did you try other index type though?

@KnightChess
Copy link
Contributor

If you use dynamic executor allocation, which shuffle service do you use, ess or rss? You can check whether the load on these nodes is too high or if the corresponding NodeManager (NM) or RemoteShuffleService (RSS) is functioning properly. Consider increasing hoodie.upsert.shuffle.parallelism if the incoming data is large.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants