Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-4105

FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 1.2.0, 1.2.1, 1.3.0, 1.4.1, 1.5.1, 1.6.1, 2.0.0
    • 2.2.0
    • Shuffle, Spark Core
    • None

    Description

      We have seen non-deterministic FAILED_TO_UNCOMPRESS(5) errors during shuffle read. Here's a sample stacktrace from an executor:

      14/10/23 18:34:11 ERROR Executor: Exception in task 1747.3 in stage 11.0 (TID 33053)
      java.io.IOException: FAILED_TO_UNCOMPRESS(5)
      	at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
      	at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
      	at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:391)
      	at org.xerial.snappy.Snappy.uncompress(Snappy.java:427)
      	at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
      	at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
      	at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
      	at org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
      	at org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1090)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:116)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1$$anonfun$onBlockFetchSuccess$1.apply(ShuffleBlockFetcherIterator.scala:115)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:243)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:52)
      	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
      	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
      	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
      	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
      	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
      	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
      	at org.apache.spark.scheduler.Task.run(Task.scala:56)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
      	at java.lang.Thread.run(Thread.java:745)
      

      Here's another occurrence of a similar error:

      java.io.IOException: failed to read chunk
              org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:348)
              org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159)
              org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
              java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
              java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:2712)
              java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2742)
              java.io.ObjectInputStream.readArray(ObjectInputStream.java:1687)
              java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
              java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
              java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
              java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
              java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
              java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
              org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
              org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
              org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
              scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
              org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
              org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
              org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:129)
              org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
              org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
              org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
              org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
              org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
              org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
              org.apache.spark.scheduler.Task.run(Task.scala:56)
              org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
              java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              java.lang.Thread.run(Thread.java:745)
      

      The first stacktrace was reported by a Spark user. The second stacktrace occurred when running

      import java.util.Random
      
      
      val numKeyValPairs=1000
      val numberOfMappers=200
      val keySize=10000
      
      for (i <- 0 to 19) {
      val pairs1 = sc.parallelize(0 to numberOfMappers, numberOfMappers).flatMap(p=>{
        val randGen = new Random
        val arr1 = new Array[(Int, Array[Byte])](numKeyValPairs)
        for (i <- 0 until numKeyValPairs){
          val byteArr = new Array[Byte](keySize)
          randGen.nextBytes(byteArr)
          arr1(i) = (randGen.nextInt(Int.MaxValue),byteArr)
        }
        arr1
      })
        pairs1.groupByKey(numberOfMappers).count
      }
      

      This job frequently runs without any problems, but when it fails it seem that every post-shuffle task fails with either PARSING_ERROR(2), FAILED_TO_UNCOMPRESS(5), or some other decompression error. I've seen reports of similar problems when using LZF compression, so I think that this is caused by some sort of general stream corruption issue.

      This issue has been observed even when no spilling occurs, so I don't believe that this is due to a bug in spilling code.

      I was unable to reproduce this when running this code in a fresh Spark EC2 cluster and we've been having a hard time finding a deterministic reproduction.

      Attachments

        1. JavaObjectToSerialize.java
          0.3 kB
          Guillaume E.B.
        2. SparkFailedToUncompressGenerator.scala
          1 kB
          Guillaume E.B.

        Issue Links

          Activity

            People

              davies Davies Liu
              joshrosen Josh Rosen
              Votes:
              42 Vote for this issue
              Watchers:
              80 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: