Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-7703

Task failure caused by block fetch failure in BlockManager.doGetRemote() when using TorrentBroadcast

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 1.2.1, 1.3.1
    • None
    • Spark Core
    • None
    • Red Hat Enterprise Linux Server release 7.0 (Maipo)
      Spark 1.3.1 Release

    Description

      I am from IBM Platform Symphony team and we are working to integration Spark with our EGO to provide a fine-grained dynamic allocation Resource Manager.

      We found a defect in current implementation of BlockManager.doGetRemote():

        private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
          require(blockId != null, "BlockId is null")
          val locations = Random.shuffle(master.getLocations(blockId))     <--------------- Issue2: locations may be out of date
          for (loc <- locations) {
            logDebug(s"Getting remote block $blockId from $loc")
            val data = blockTransferService.fetchBlockSync(
              loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()      <--------------- Issue1: This statement is not in try/catch
      
            if (data != null) {
              if (asBlockResult) {
                return Some(new BlockResult(
                  dataDeserialize(blockId, data),
                  DataReadMethod.Network,
                  data.limit()))
              } else {
                return Some(data)
              }
            }
            logDebug(s"The value of block $blockId is null")
          }
          logDebug(s"Block $blockId not found")
          None
        }
      
      • Issue 1: Although the block fetch uses "for" to try all available locations, the fetch method is not guarded by a "Try" block. When exception occurs, this method will directly throw the error instead of trying other block locations. The uncaught exception will cause task failure.
      • Issue 2: Constant "location" is acquired before fetching, however in a dynamic allocation environment the block locations may change.

      We hit the above 2 issues in our use case, where Executors exit after all its assigned tasks are done. We occasionally get the following error (issue 1.):

      15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
      15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
      15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
      15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
      15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
      15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
      15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
      15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of broadcast_0
      15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 as bytes
      15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered locally
      15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 as bytes
      15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
      15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17)
      15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to sparkbj01/9.111.254.195:37599
      15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks 
      java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
      	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
      	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
      	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
      	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
      	at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
      	at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
      	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
      	at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:599)
      	at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:597)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
      	at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:597)
      	at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:591)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
      	at scala.Option.orElse(Option.scala:257)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
      	at scala.collection.immutable.List.foreach(List.scala:318)
      	at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
      	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
      	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1149)
      	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
      	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
      	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
      	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
      	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
      	at org.apache.spark.scheduler.Task.run(Task.scala:56)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)
      Caused by: java.net.ConnectException: Connection refused: sparkbj01/9.111.254.195:37599
      	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
      	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
      	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
      	... 1 more
      15/05/13 10:28:35 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
      15/05/13 10:28:40 DEBUG TransportClientFactory: Creating new connection to sparkbj01/9.111.254.195:37599
      15/05/13 10:28:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
      java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
      	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
      	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
      	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
      	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
      	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
      	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
      	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      	at java.lang.Thread.run(Thread.java:662)
      Caused by: java.net.ConnectException: Connection refused: sparkbj01/9.111.254.195:37599
      	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
      	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
      	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
      	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
      	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
      	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
      	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
      	... 1 more
      

      We did send "ExecutorLost" messages so that BlockManagerMaster can remove the executor from its block location map. But due to network latency the "getLocation" call may happen before the removal.

      In our heavy workload environment, some tasks may keep fail and finally causes job failure.

      Using HttpBroadcast instead of default TorrentBroadcast did help to resolve this problem but we want better performance. So we added a Try block but found that the "for" loop will try dozens of dead executor before finally fetched the block from driver's BlockManager. This process takes several minutes.

      We are now working around this problem by the following fix:

        private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
          require(blockId != null, "BlockId is null")
          var blockFetched = false
          while (!blockFetched) {
            val locations = Random.shuffle(master.getLocations(blockId))
            val loc = locations.head
            logDebug(s"Getting remote block $blockId from $loc")
            val dataTry = Try(blockTransferService.fetchBlockSync(
              loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer())
            
            dataTry match {
              case Success(data) =>
                if (data != null) {
                  if (asBlockResult) {
                    return Some(new BlockResult(
                      dataDeserialize(blockId, data),
                      DataReadMethod.Network,
                      data.limit()))
                  } else {
                    return Some(data)
                  }
                }
                logDebug(s"The value of block $blockId is null")
              case Failure(e) =>
                logWarning(s"Failed to fetch block ${blockId.toString} from ${loc.host}:"
                           + s"${loc.port} executorId:${loc.executorId}. "
                           + {
                             if (locations.size <= 1) "" else "Will update locations and retry."
                           })
            }
            // If we have no more than 1 location to get from (the driver), we may stop retrying and just exit.
            blockFetched = (locations.size <= 1)
          }
          logDebug(s"Block $blockId not found")
          None
        }
      

      This fix suppress the Exception when fetch fails, and update the location to reduce future failures.

      We are expecting to get help from experts in the community to have a more thorough solution (e.g., can we try all available block locations in a random rolling manner, instead of re-trying the same location 4 times consecutively?)

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              wenhailong1988 Hailong Wen
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: