Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-9591

Job failed for exception during getting Broadcast variable

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.3.1, 1.4.0, 1.4.1
    • 1.6.0
    • Spark Core
    • None

    Description

      Job might failed for exception throw when we getting the broadcast variable especially using dynamic resource allocate.
      driver log

      2015-07-21 05:36:31 INFO 15/07/21 05:36:31 WARN TaskSetManager: Lost task 496.1 in stage 19.0 (TID 1715, XXXXXX): java.io.IOException: Failed to connect to XXXXX:27072
      at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
      at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
      at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
      at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
      at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
      at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:662)
      Caused by: java.net.ConnectException: Connection refused: xxxxxx:27072
      at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
      at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
      at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
      at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
      ... 1 more
      15/07/21 05:36:32 WARN TaskSetManager: Lost task 496.2 in stage 19.0 (TID 1744, xxxxx): java.io.IOException: Failed to connect to XXXX/XXXXXXXX:34070
      at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
      at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
      at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
      at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
      at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
      at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:662)
      Caused by: java.net.ConnectException: Connection refused: xxx:34070
      at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
      at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
      at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
      at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
      at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
      at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
      at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
      ... 1 more
      
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 496 in stage 19.0 failed 4 times
      

      executor log

      15/07/21 05:36:17 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks
      java.io.IOException: Failed to connect to xxx
              at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
              at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
              at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
              at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
              at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
              at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
              at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
              at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:592)
              at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:590)
              at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
              at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
              at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:590)
              at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:584)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:127)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:137)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:137)
              at scala.Option.orElse(Option.scala:257)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
              at scala.collection.immutable.List.foreach(List.scala:318)
              at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
              at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
              at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1246)
              at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
              at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
              at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
              at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
              at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
              at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:132)
              at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:216)
              at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:212)
              at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:93)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
              at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
              at org.apache.spark.scheduler.Task.run(Task.scala:64)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:209)
              at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
              at java.lang.Thread.run(Thread.java:662)
      

      When we getting the broadcast variable, we can fetch the block form several location,but now when connecting the lost blockmanager(idle for enough time removed by driver when using dynamic resource allocate and so on) will cause task fail,and the worse case will caused the job fail.

      Attachments

        Issue Links

          Activity

            People

              jeanlyn jeanlyn
              jeanlyn jeanlyn
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: