Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.1.1, 2.2.0
-
None
Description
From SPARK-13990 & SPARK-13926, Spark's SerializerManager has its own instance of a KryoSerializer which does not have the defaultClassLoader set on it. For normal task execution, that doesn't cause problems, because the serializer falls back to the current thread's task loader, which is set anyway.
however, netty maintains its own thread pool, and those threads don't change their classloader to include the extra use jars needed for the custom kryo registrator. That only matters when blocks are sent across the network which force serde in the netty thread. That won't happen often, because (a) spark tries to execute tasks where the RDDs are already cached and (b) broadcast blocks generally don't require any serde in the netty threads (that occurs in the task thread that is reading the broadcast value). However it can come up with remote cache reads, or if fetching a broadcast block forces another block to disk, which requires serialization.
This doesn't effect the shuffle path, because the serde is never done in the threads created by netty.
I think a fix for this should be fairly straight-forward, we just need to set the classloader on that extra kryo instance.
(original problem description below)
I unfortunately can't reliably reproduce this bug; it happens only occasionally, when training a logistic regression model with very large datasets. The training will often proceed through several treeAggregate calls without any problems, and then suddenly workers will start running into this java.lang.ClassNotFoundException.
After doing some debugging, it seems that whenever this error happens, Spark is trying to use the sun.misc.Launcher$AppClassLoader ClassLoader instance instead of the usual org.apache.spark.util.MutableURLClassLoader. MutableURLClassLoader can see my custom Kryo registrator, but the AppClassLoader instance can't.
When this error does pop up, it's usually accompanied by the task seeming to hang, and I need to kill Spark manually.
I'm running a Spark application in cluster mode via spark-submit, and I have a custom Kryo registrator. The JAR is built with sbt assembly.
Exception message:
17/08/29 22:39:04 ERROR TransportRequestHandler: Error opening block StreamChunkId{streamId=542074019336, chunkIndex=0} for request from /10.0.29.65:34332 org.apache.spark.SparkException: Failed to register classes with Kryo at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:139) at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:292) at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:277) at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:186) at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:169) at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1382) at org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1377) at org.apache.spark.storage.DiskStore.put(DiskStore.scala:69) at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1377) at org.apache.spark.storage.memory.MemoryStore.org$apache$spark$storage$memory$MemoryStore$$dropBlock$1(MemoryStore.scala:524) at org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$2.apply(MemoryStore.scala:545) at org.apache.spark.storage.memory.MemoryStore$$anonfun$evictBlocksToFreeSpace$2.apply(MemoryStore.scala:539) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:539) at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:92) at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:73) at org.apache.spark.memory.StaticMemoryManager.acquireStorageMemory(StaticMemoryManager.scala:72) at org.apache.spark.storage.memory.MemoryStore.putBytes(MemoryStore.scala:147) at org.apache.spark.storage.BlockManager.maybeCacheDiskBytesInMemory(BlockManager.scala:1143) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:594) at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:559) at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:559) at scala.Option.map(Option.scala:146) at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:559) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:353) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:61) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$1.apply(NettyBlockRpcServer.scala:60) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31) at org.apache.spark.network.server.OneForOneStreamManager.getChunk(OneForOneStreamManager.java:89) at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:125) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:103) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.foo.bar.MyKryoRegistrator at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:134) at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:134) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:134) ... 60 more
My Spark session is created like so:
val spark = SparkSession.builder() .appName("FooBar") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer.max", "2047m") .config("spark.kryo.registrator","com.foo.bar.MyKryoRegistrator") .config("spark.kryo.registrationRequired", "true") .config("spark.network.timeout", "3600s") .config("spark.driver.maxResultSize", "0") .config("spark.rdd.compress", "true") .config("spark.shuffle.spill", "true") .getOrCreate()
Here are the config options I'm passing to spark-submit:
--conf "spark.executor.heartbeatInterval=400s" --conf "spark.speculation=true" --conf "spark.speculation.multiplier=30" --conf "spark.speculation.quantile=0.95" --conf "spark.memory.useLegacyMode=true" --conf "spark.shuffle.memoryFraction=0.8" --conf "spark.storage.memoryFraction=0.2" --driver-java-options "-XX:+UseG1GC"
I was able to find a workaround: copy your application JAR to each of the machines in your cluster, and pass the JAR's path to spark-submit with:
--conf "spark.driver.extraClassPath=/path/to/sparklogisticregression.jar" --conf "spark.executor.extraClassPath=/path/to/sparklogisticregression.jar"
Attachments
Issue Links
- relates to
-
SPARK-22083 When dropping multiple blocks to disk, Spark should release all locks on a failure
- Resolved
- links to