Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-30602 SPIP: Support push-based shuffle to improve shuffle efficiency
  3. SPARK-38987

Handle fallback when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.2.0
    • 3.4.0
    • Shuffle
    • None

    Description

      Current implementation doesn't handle the corruption cases well for merged shuffle blocks, and if there is a corrupted shuffle chunk, the task will directly fail with the error message shown below, without triggering the expected fallback, which will try fetching original unmerged shuffle blocks:
      org.apache.spark.SparkException: Failed to get block shuffleChunk_30_0_1070_0, which is not a shuffle block
      at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1189)
      at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:1352)
      at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
      at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
      at java.io.DataInputStream.readInt(DataInputStream.java:387)
      at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
      at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
      at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
      at org.apache.spark.shuffle.BlockStoreShuffleReader.$anonfun$read$2(BlockStoreShuffleReader.scala:98)
      at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
      at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
      at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
      at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
      at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
      at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage138.sort_addToSorter_0$(Unknown Source)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage138.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
      at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
      at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:783)
      at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:742)
      at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:908)
      at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:944)
      at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage140.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
      at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
      at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoinExec.scala:783)
      at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoinExec.scala:742)
      at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:908)
      at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:944)
      at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
      at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage142.processNext(Unknown Source)
      at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
      at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
      at

      Attachments

        Activity

          People

            apatnam Aravind Patnam
            zhouyejoe Ye Zhou
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: