Description
Repro steps:
$ spark-shell --conf spark.driver.memory=1g val df = spark.range(5000000).withColumn("str", lit("abcdabcdabcdabcdabasgasdfsadfasdfasdfasfasfsadfasdfsadfasdf")) val df2 = spark.range(10).join(broadcast(df), Seq("id"), "left_outer") df2.collect
This will cause the driver to hang indefinitely. Heres a thread dump of the main thread when its stuck
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:285)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$2819/629294880.apply(Unknown Source)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236) => holding Monitor(java.lang.Object@1932537396})
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4179)
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3420)
org.apache.spark.sql.Dataset$$Lambda$2390/1803372144.apply(Unknown Source)
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4169)
org.apache.spark.sql.Dataset$$Lambda$2791/1357377136.apply(Unknown Source)
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4167)
org.apache.spark.sql.Dataset$$Lambda$2391/1172042998.apply(Unknown Source)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2402/721269425.apply(Unknown Source)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2392/11632488.apply(Unknown Source)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:4167)
org.apache.spark.sql.Dataset.collect(Dataset.scala:3420)
When we disable AQE though we get the following exception instead of driver hang.
Caused by: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value. ... 7 more Caused by: java.lang.OutOfMemoryError: Java heap space at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:834) at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:777) at org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:1086) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:157) at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163) at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$Lambda$2999/145945436.apply(Unknown Source) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217) at org.apache.spark.sql.execution.SQLExecution$$$Lambda$3001/1900142693.call(Unknown Source) ... 4 more
I expect to see the same exception even when AQE is enabled.
Attachments
Issue Links
- relates to
-
SPARK-40663 Migrate execution errors onto error classes
- Resolved