Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24355

Improve Spark shuffle server responsiveness to non-ChunkFetch requests

    XMLWordPrintableJSON

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.0
    • Fix Version/s: 3.0.0
    • Component/s: Shuffle
    • Labels:
      None
    • Environment:

      Hadoop-2.7.4

      Spark-2.3.0

      Description

      We run Spark on YARN, and deploy Spark external shuffle service as part of YARN NM aux service.

      One issue we saw with Spark external shuffle service is the various timeout experienced by the clients on either registering executor with local shuffle server or establish connection to remote shuffle server.

      Example of a timeout for establishing connection with remote shuffle server:

      java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
      	at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
      	at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:288)
      	at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
      	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:248)
      	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
      	at org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:106)
      	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
      	at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
      	at org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:115)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:182)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.org$apache$spark$storage$ShuffleBlockFetcherIterator$$send$1(ShuffleBlockFetcherIterator.scala:396)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:391)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:345)
      	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:57)
      ....

      Example of a timeout for registering executor with local shuffle server:

      ava.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task.
      	at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160)
      	at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278)
      	at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80)
      	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
      	at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
      	at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
      	at org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
      

      While patches such as SPARK-20640 and config parameters such as spark.shuffle.registration.timeout and spark.shuffle.sasl.timeout (when spark.authenticate is set to true) could help to alleviate this type of problems, it does not solve the fundamental issue.

      We have observed that, when the shuffle workload gets very busy in peak hours, the client requests could timeout even after configuring these parameters to very high values. Further investigating this issue revealed the following issue:

      Right now, the default server side netty handler threads is 2 * # cores, and can be further configured with parameter spark.shuffle.io.serverThreads.
      In order to process a client request, it would require one available server netty handler thread.
      However, when the server netty handler threads start to process ChunkFetchRequests, they will be blocked on disk I/O, mostly due to disk contentions from the random read operations initiated by all the ChunkFetchRequests received from clients.
      As a result, when the shuffle server is serving many concurrent ChunkFetchRequests, the server side netty handler threads could all be blocked on reading shuffle files, thus leaving no handler thread available to process other types of requests which should all be very quick to process.

      This issue could potentially be fixed by limiting the number of netty handler threads that could get blocked when processing ChunkFetchRequest. We have a patch to do this by using a separate EventLoopGroup with a dedicated ChannelHandler to process ChunkFetchRequest. This enables shuffle server to reserve netty handler threads for non-ChunkFetchRequest, thus enabling consistent processing time for these requests which are fast to process. After deploying the patch in our infrastructure, we no longer see timeout issues with either executor registration with local shuffle server or shuffle client establishing connection with remote shuffle server.

      Will post the patch soon, and want to gather feedbacks from the community.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                redsanket Sanket Chintapalli
                Reporter:
                mshen Min Shen
              • Votes:
                0 Vote for this issue
                Watchers:
                10 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: