Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10466

UnsafeRow exception in Sort-Based Shuffle with data spill

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • None
    • 1.5.1, 1.6.0
    • SQL
    • None

    Description

      In sort-based shuffle, if we have data spill, it will cause assert exception, the follow code can reproduce that

      withSparkConf(("spark.shuffle.sort.bypassMergeThreshold", "2")) {
            withSQLConf(("spark.sql.autoBroadcastJoinThreshold", "0")) {
              withTempTable("mytemp") {
                sparkContext.parallelize(1 to 10000000, 3).map(i => (i, i)).toDF("key", "value").registerTempTable("mytemp")
                sql("select key, value as v1 from mytemp where key > 1").registerTempTable("l")
                sql("select key, value as v2 from mytemp where key > 3").registerTempTable("r")
      
                val df3 = sql("select v1, v2 from l left join r on l.key=r.key")
                df3.count()
              }
            }
          }
      
      java.lang.AssertionError: assertion failed
      	at scala.Predef$.assert(Predef.scala:165)
      	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
      	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      	at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
      	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
      	at org.apache.spark.scheduler.Task.run(Task.scala:88)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
      	at java.lang.Thread.run(Thread.java:722)
      17:32:06.172 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 1.0 (TID 4, localhost): java.lang.AssertionError: assertion failed
      	at scala.Predef$.assert(Predef.scala:165)
      	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.writeKey(UnsafeRowSerializer.scala:75)
      	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:180)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:688)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2$$anonfun$apply$1.apply(ExternalSorter.scala:687)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:687)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$2.apply(ExternalSorter.scala:683)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
      	at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:683)
      	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
      	at org.apache.spark.scheduler.Task.run(Task.scala:88)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
      	at java.lang.Thread.run(Thread.java:722)
      

      Attachments

        Activity

          People

            chenghao Cheng Hao
            chenghao Cheng Hao
            Votes:
            0 Vote for this issue
            Watchers:
            13 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: