Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37793

Invalid LocalMergedBlockData cause task hang

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 3.3.0
    • 3.3.0, 3.2.2
    • Shuffle
    • None

    Description

      When enable push-based shuffle, there is a chance that task hang

       

      59	Executor task launch worker for task 424.0 in stage 753.0 (TID 106778)	WAITING	Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1660371198})
      sun.misc.Unsafe.park(Native Method)
      java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
      java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
      org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:753)
      org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:85)
      org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
      scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
      scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
      scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
      org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
      org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
      scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
      org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.sort_addToSorter_0$(Unknown Source)
      org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
      org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
      org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Unknown Source)
      org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_1$(Unknown Source)
      org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithKeys_0$(Unknown Source)
      org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
      org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
      org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
      scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
      org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
      org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
      org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
      org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
      org.apache.spark.scheduler.Task.run(Task.scala:136)
      org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:507)
      org.apache.spark.executor.Executor$TaskRunner$$Lambda$518/852390142.apply(Unknown Source)
      org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
      org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:510)
      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      java.lang.Thread.run(Thread.java:748)
      

      ShuffleBlockFetcherIterator.scala:753

          while (result == null) {
            val startFetchWait = System.nanoTime()
      753>  result = results.take()
            val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
            shuffleMetrics.incFetchWaitTime(fetchWaitTime)
            ..
          }
      

      Attachments

        Activity

          People

            csingh Chandni Singh
            pan3793 Cheng Pan
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: