Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19529

TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.6.0, 2.0.0, 2.1.0
    • 1.6.4, 2.0.3, 2.1.1, 2.2.0
    • Shuffle, Spark Core
    • None

    Description

      In Spark's Netty RPC layer, TransportClientFactory.createClient() calls awaitUninterruptibly() on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when interruptOnCancel = true.

      As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack:

      java.lang.Object.wait(Native Method)
      java.lang.Object.wait(Object.java:460)
      io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) 
      io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) 
      org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) 
      org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object@1849476028}) 
      org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) 
      org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) 
      org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) 
      org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) 
      org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) 
      org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
      350) 
      org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) 
      org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) 
      org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) 
      org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) 
      org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
      org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
      [...]
      

      I believe that we can easily fix this by using the InterruptedException-throwing await() instead.

      Attachments

        Issue Links

          Activity

            People

              joshrosen Josh Rosen
              joshrosen Josh Rosen
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: