Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-16550

Caching data with replication doesn't replicate data

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1, 2.1.0
    • Block Manager, Spark Core
    • None

    Description

      Caching multiple replicas of blocks is currently broken. The following examples show replication doesn't happen for various use-cases:

      These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode

      case class TestInteger(i: Int)
      val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_2)
      data.count
      

      sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows only 10 blocks as opposed to the expected 20
      Block replication fails on the executors with a java.lang.RuntimeException: java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger

      val data1 = sc.parallelize(1 to 1000, 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2)
      data1.count
      
      Block replication again fails with the following errors:
      16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8567643992794608648
      com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
      	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
      	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
      	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
      	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
      	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213)
      	at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775)
      	at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
      

      sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum again shows 10 blocks

      Caching serialized data works for native types, but not for custom classes

      val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2)
      data3.count
      

      works as intended.

      But

      val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_SER_2)
      data4.count
      

      Again doesn't replicate data and executors show the same ClassNotFoundException

      These examples worked fine and showed expected results with Spark 1.6.2

      Attachments

        There are no Sub-Tasks for this issue.

        Activity

          People

            ekhliang Eric Liang
            shubhamc Shubham Chopra
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: