Details
Description
ExecutorClassLoader does not ensure proper cleanup of network connections that it opens. If it fails to load a class, it may leak partially-consumed InputStreams that are connected to the REPL's HTTP class server, causing that server to exhaust its thread pool, which can cause the entire job to hang.
Here is a simple reproduction:
With
./bin/spark-shell --master local-cluster[8,8,512]
run the following command:
sc.parallelize(1 to 1000, 1000).map { x => try { Class.forName("some.class.that.does.not.Exist") } catch { case e: Exception => // do nothing } x }.count()
This job will run 253 tasks, then will completely freeze without any errors or failed tasks.
It looks like the driver has 253 threads blocked in socketRead0() calls:
[joshrosen ~]$ jstack 16765 | grep socketRead0 | wc 253 759 14674
e.g.
"qtp1287429402-13" daemon prio=5 tid=0x00007f868a1c0000 nid=0x5b03 runnable [0x00000001159bd000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at org.eclipse.jetty.io.ByteArrayBuffer.readFrom(ByteArrayBuffer.java:391) at org.eclipse.jetty.io.bio.StreamEndPoint.fill(StreamEndPoint.java:141) at org.eclipse.jetty.server.bio.SocketConnector$ConnectorEndPoint.fill(SocketConnector.java:227) at org.eclipse.jetty.http.HttpParser.fill(HttpParser.java:1044) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:280) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.BlockingHttpConnection.handle(BlockingHttpConnection.java:72) at org.eclipse.jetty.server.bio.SocketConnector$ConnectorEndPoint.run(SocketConnector.java:264) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:745)
Jstack on the executors shows blocking in loadClass / findClass, where a single thread is RUNNABLE and waiting to hear back from the driver and other executor threads are BLOCKED on object monitor synchronization at Class.forName0().
Remotely triggering a GC on a hanging executor allows the job to progress and complete more tasks before hanging again. If I repeatedly trigger GC on all of the executors, then the job runs to completion:
jps | grep CoarseGra | cut -d ' ' -f 1 | xargs -I {} -n 1 -P100 jcmd {} GC.run
The culprit is a catch block that ignores all exceptions and performs no cleanup: https://github.com/apache/spark/blob/v1.2.0/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala#L94
This bug has been present since Spark 1.0.0, but I suspect that we haven't seen it before because it's pretty hard to reproduce. Triggering this error requires a job with tasks that trigger ClassNotFoundExceptions yet are still able to run to completion. It also requires that executors are able to leak enough open connections to exhaust the class server's Jetty thread pool limit, which requires that there are a large number of tasks (253+) and either a large number of executors or a very low amount of GC pressure on those executors (since GC will cause the leaked connections to be closed).
The fix here is pretty simple: add proper resource cleanup to this class.