Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25070

BlockFetchingListener#onBlockFetchSuccess throw "java.util.NoSuchElementException: key not found: shuffle_8_68_113" on ShuffleBlockFetcherIterator caused stage hang long time

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.2.0
    • None
    • Spark Core

    Description

      The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the task hang long time and speculate as false.

      The log is below:

      18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches in 16 ms

      18/08/08 14:55:53 WARN TransportChannelHandler: Exception in connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not found: shuffle_8_68_113 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:59) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217) at org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171) at

      XXXXXX

      18/08/08 14:55:53 INFO Executor: Finished task 44.0 in stage 14.0 (TID 1483). 3458 bytes result sent to driver 18/08/09 10:02:32 INFO Executor: Executor is trying to kill task 113.0 in stage 14.0 (TID 1552), reason: stage cancelled

      val blockFetchingListener = new BlockFetchingListener {
        override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
          // Only add the buffer to results queue if the iterator is not zombie,
          // i.e. cleanup() has not been called yet.
          ShuffleBlockFetcherIterator.this.synchronized {
            try {
              if (!isZombie) {
                // Increment the ref count because we need to pass this to a different thread.
                // This needs to be released after use.
                buf.retain()
                remainingBlocks -= blockId
                results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf,
                  remainingBlocks.isEmpty))
                logDebug("remainingBlocks: " + remainingBlocks)
              }
            } catch {
              case e : Throwable => onBlockFetchFailure(blockId, e)
            }
          }
          logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
        }
      
        override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
          logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
          results.put(new FailureFetchResult(BlockId(blockId), address, e))
        }
      }
      

       

       

      results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf,
                  remainingBlocks.isEmpty))

        

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            Deng FEI DENG FEI
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: