Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.0.0
-
None
Description
Caching multiple replicas of blocks is currently broken. The following examples show replication doesn't happen for various use-cases:
These were run using Spark 2.0.0-preview, in local-cluster[2,1,1024] mode
case class TestInteger(i: Int) val data = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_2) data.count
sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data.id).size).sum shows only 10 blocks as opposed to the expected 20
Block replication fails on the executors with a java.lang.RuntimeException: java.lang.ClassNotFoundException: $line14.$read$$iw$$iw$TestInteger
val data1 = sc.parallelize(1 to 1000, 10).persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_2) data1.count Block replication again fails with the following errors: 16/07/14 14:50:40 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 8567643992794608648 com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:213) at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:775) at org.apache.spark.storage.BlockManager$$anonfun$doPutBytes$1.apply(BlockManager.scala:753)
sc.getExecutorStorageStatus.map(s => s.rddBlocksById(data1.id).size).sum again shows 10 blocks
Caching serialized data works for native types, but not for custom classes
val data3 = sc.parallelize(1 to 1000, 10).persist(MEMORY_ONLY_SER_2) data3.count
works as intended.
But
val data4 = sc.parallelize((1 to 1000).map(TestInteger(_)), 10).persist(MEMORY_ONLY_SER_2) data4.count
Again doesn't replicate data and executors show the same ClassNotFoundException
These examples worked fine and showed expected results with Spark 1.6.2
Attachments
Issue Links
- links to
1.
|
Repl-defined classes cannot be replicated | Resolved | Eric Liang |