Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22618

RDD.unpersist can cause fatal exception when used with dynamic allocation

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.3.0
    • 2.3.0
    • Spark Core
    • None

    Description

      If you use rdd.unpersist() with dynamic allocation, then an executor can be deallocated while your rdd is being removed, which will throw an uncaught exception killing your job.

      I looked into different ways of preventing this error from occurring but couldn't come up with anything that wouldn't require a big change. I propose the best fix is just to catch and log IOExceptions in unpersist() so they don't kill your job. This will match the effective behavior when executors are lost from dynamic allocation in other parts of the code.

      In the worst case scenario I think this could lead to RDD partitions getting left on executors after they were unpersisted, but this is probably better than the whole job failing. I think in most cases the IOException would be due to the executor dieing for some reason, which is effectively the same result as unpersisting the rdd from that executor anyway.

      I noticed this exception in a job that loads a 100GB dataset on a cluster where we use dynamic allocation heavily. Here is the relevant stack trace

      java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
      at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
      at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
      at sun.nio.ch.IOUtil.read(IOUtil.java:192)
      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
      at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
      at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
      at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
      at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
      at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
      at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
      at java.lang.Thread.run(Thread.java:748)
      Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult:
      at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
      at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
      at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
      at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
      at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
      at com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
      at com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
      at com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
      at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
      at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      at scala.collection.immutable.List.map(List.scala:285)
      at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
      at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:52)
      at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:47)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      at scala.collection.immutable.Range.foreach(Range.scala:160)
      at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.run(SuiteKickoff.scala:47)
      at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially$1.apply(MultipleSuiteKickoff.scala:24)
      at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially$1.apply(MultipleSuiteKickoff.scala:24)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$.com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially(MultipleSuiteKickoff.scala:24)
      at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$run$1.apply(MultipleSuiteKickoff.scala:13)
      at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$run$1.apply(MultipleSuiteKickoff.scala:10)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$.run(MultipleSuiteKickoff.scala:10)
      at com.ibm.sparktc.sparkbench.cli.CLIKickoff$.main(CLIKickoff.scala:16)
      at com.ibm.sparktc.sparkbench.cli.CLIKickoff.main(CLIKickoff.scala)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
      at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:843)
      at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:188)
      at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:218)
      at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
      at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      Caused by: java.io.IOException: Connection reset by peer
      at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
      at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
      at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
      at sun.nio.ch.IOUtil.read(IOUtil.java:192)
      at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
      at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
      at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
      at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
      at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
      at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
      at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
      at java.lang.Thread.run(Thread.java:748)

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            bradkaiser Brad
            bradkaiser Brad
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment