Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17204

Spark 2.0 off heap RDD persistence with replication factor 2 leads to in-memory data corruption

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.0.3, 2.1.1, 2.2.0
    • Spark Core
    • None

    Description

      We use the OFF_HEAP storage level extensively with great success. We've tried off-heap storage with replication factor 2 and have always received exceptions on the executor side very shortly after starting the job. For example:

      com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9086
      	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
      	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
      	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
      	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
      	at org.apache.spark.scheduler.Task.run(Task.scala:85)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      or

      java.lang.IndexOutOfBoundsException: Index: 6, Size: 0
      	at java.util.ArrayList.rangeCheck(ArrayList.java:653)
      	at java.util.ArrayList.get(ArrayList.java:429)
      	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
      	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788)
      	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
      	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
      	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
      	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
      	at org.apache.spark.scheduler.Task.run(Task.scala:85)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      or

      java.lang.NullPointerException
      	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:141)
      	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:140)
      	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
      	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
      	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
      	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
      	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
      	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
      	at org.apache.spark.scheduler.Task.run(Task.scala:85)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      We've tried switching to Java serialization and get a different exception:

      java.io.StreamCorruptedException: invalid stream header: 780000D0
      	at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:808)
      	at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301)
      	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
      	at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
      	at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
      	at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
      	at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:433)
      	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
      	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
      	at org.apache.spark.scheduler.Task.run(Task.scala:85)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      This suggest some kind of memory corruption to me.

      I've been able to consistently reproduce this problem in local-cluster mode with a very simple code snippet. First, start a spark shell like this:

      MASTER=local-cluster[2,1,1024] ./spark-2.1/bin/spark-shell --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1024
      

      Then run the following:

      import org.apache.spark.storage.StorageLevel
      val OFF_HEAP_2 = StorageLevel(useDisk = true, useMemory = true, useOffHeap = true, deserialized = false, replication = 2)
      sc.range(0, 100).persist(OFF_HEAP_2).count
      

      Attachments

        Activity

          People

            michael Michael MacFadden
            michael Michael MacFadden
            Votes:
            0 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: