Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16550

Caching data with replication doesn't replicate data

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1, 2.1.0
    • Block Manager, Spark Core
    • None

    Description

      Caching multiple replicas of blocks is currently broken. The following examples show replication doesn't happen for various use-cases:

      These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode

      case class TestInteger(i: Int)
      val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_2)
      data.count
      

      sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows only 10 blocks as opposed to the expected 20
      Block replication fails on the executors with a java.lang.RuntimeException: java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger

      val data1 = sc.parallelize(1 to 1000, 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
      data1.count
      
      Block replication again fails with the following errors:
      16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8567643992794608648
      com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
      	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
      	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
      	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
      	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
      	at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
      	at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
      

      sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum again shows 10 blocks

      Caching serialized data works for native types, but not for custom classes

      val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
      data3.count
      

      works as intended.

      But

      val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_SER_2)
      data4.count
      

      Again doesn't replicate data and executors show the same ClassNotFoundException

      These examples worked fine and showed expected results with Spark 1.6.2

      Attachments

        Activity

          People

            ekhliang Eric Liang
            shubhamc Shubham Chopra
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: