Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-42290

Spark Driver hangs on OOM during Broadcast when AQE is enabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 3.4.0
    • 3.4.1, 3.5.0
    • SQL
    • None

    Description

      Repro steps:

      $ spark-shell --conf spark.driver.memory=1g
      
      val df = spark.range(5000000).withColumn("str", lit("abcdabcdabcdabcdabasgasdfsadfasdfasdfasfasfsadfasdfsadfasdf"))
      val df2 = spark.range(10).join(broadcast(df), Seq("id"), "left_outer")
      
      df2.collect
      

      This will cause the driver to hang indefinitely. Heres a thread dump of the main thread when its stuck

      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
      java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:285)
      org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$2819/629294880.apply(Unknown Source)
      org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
      org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:236) => holding Monitor(java.lang.Object@1932537396})
      org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:381)
      org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
      org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4179)
      org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3420)
      org.apache.spark.sql.Dataset$$Lambda$2390/1803372144.apply(Unknown Source)
      org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4169)
      org.apache.spark.sql.Dataset$$Lambda$2791/1357377136.apply(Unknown Source)
      org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
      org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4167)
      org.apache.spark.sql.Dataset$$Lambda$2391/1172042998.apply(Unknown Source)
      org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
      org.apache.spark.sql.execution.SQLExecution$$$Lambda$2402/721269425.apply(Unknown Source)
      org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
      org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
      org.apache.spark.sql.execution.SQLExecution$$$Lambda$2392/11632488.apply(Unknown Source)
      org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:809)
      org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
      org.apache.spark.sql.Dataset.withAction(Dataset.scala:4167)
      org.apache.spark.sql.Dataset.collect(Dataset.scala:3420)
      

      When we disable AQE though we get the following exception instead of driver hang.

      Caused by: org.apache.spark.SparkException: Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value.
        ... 7 more
      Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.grow(HashedRelation.scala:834)
        at org.apache.spark.sql.execution.joins.LongToUnsafeRowMap.append(HashedRelation.scala:777)
        at org.apache.spark.sql.execution.joins.LongHashedRelation$.apply(HashedRelation.scala:1086)
        at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:157)
        at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1163)
        at org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode.transform(HashedRelation.scala:1151)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:148)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$Lambda$2999/145945436.apply(Unknown Source)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:217)
        at org.apache.spark.sql.execution.SQLExecution$$$Lambda$3001/1900142693.call(Unknown Source)
        ... 4 more
      

      I expect to see the same exception even when AQE is enabled.

      Attachments

        Issue Links

          Activity

            People

              fanjia Jia Fan
              shardulm Shardul Mahadik
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: