Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Won't Fix
-
3.0.0
-
None
-
None
Description
In SPARK-24355, we proposed to use a separate chunk fetch handler thread pool to handle the slow-to-process chunk fetch requests in order to improve the responsiveness of shuffle service for RPC requests.
Initially, we thought by making the number of Netty server threads larger than the number of chunk fetch handler threads, it would reserve some threads for RPC requests thus resolving the various RPC request timeout issues we experienced previously. The solution worked in our cluster initially. However, as the number of Spark applications in our cluster continues to increase, we saw the RPC request (SASL authentication specifically) timeout issue again:
java.lang.RuntimeException: java.util.concurrent.TimeoutException: Timeout waiting for task. at org.spark-project.guava.base.Throwables.propagate(Throwables.java:160) at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:278) at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:80) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228) at org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181) at org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141) at org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:218)
After further investigation, we realized that as the number of concurrent clients connecting to a shuffle service increases, it becomes VERY important to configure the number of Netty server threads and number of chunk fetch handler threads correctly. Specifically, the number of Netty server threads needs to be a multiple of the number of chunk fetch handler threads. The reason is explained in details below:
When a channel is established on the Netty server, it is registered with both the Netty server default EventLoopGroup and the chunk fetch handler EventLoopGroup. Once registered, this channel sticks with a given thread in both EventLoopGroups, i.e. all requests from this channel is going to be handled by the same thread. Right now, Spark shuffle Netty server uses the default Netty strategy to select a thread from a EventLoopGroup to be associated with a new channel, which is simply round-robin (Netty's DefaultEventExecutorChooserFactory).
In SPARK-24355, with the introduced chunk fetch handler thread pool, all chunk fetch requests from a given channel will be first added to the task queue of the chunk fetch handler thread associated with that channel. When the requests get processed, the chunk fetch request handler thread will submit a task to the task queue of the Netty server thread that's also associated with this channel. If the number of Netty server threads is not a multiple of the number of chunk fetch handler threads, it would become a problem when the server has a large number of concurrent connections.
Assume we configure the number of Netty server threads as 40 and the percentage of chunk fetch handler threads as 87, which leads to 35 chunk fetch handler threads. Then according to the round-robin policy, channel 0, 40, 80, 120, 160, 200, 240, and 280 will all be associated with the 1st Netty server thread in the default EventLoopGroup. However, since the chunk fetch handler thread pool only has 35 threads, out of these 8 channels, only channel 0 and 280 will be associated with the same chunk fetch handler thread. Thus, channel 0, 40, 80, 120, 160, 200, 240 will all be associated with different chunk fetch handler threads but associated with the same Netty server thread. This means, the 7 different chunk fetch handler threads associated with these channels could potentially submit tasks to the task queue of the same Netty server thread at the same time. This would lead to 7 slow-to-process requests sitting in the task queue. If an RPC request is put in the task queue after these 7 requests, it is very likely to timeout.
In our cluster, the number of concurrent active connections to a shuffle service could go as high as 6K+ during peak. If the numbers of these thread pools are not configured correctly, our Spark applications are guaranteed to see SASL timeout issues when a shuffle service is dealing with a lot of incoming chunk fetch requests from many distinct clients, which lead to stage failures and lengthy retries.
To resolve this issue, the number of Netty server threads needs to be a multiple of the number of chunk fetch handler threads. This way, the round-robin policy will guarantee that channels associated with different chunk fetch handler threads will also be associated with different Netty server threads, thus eliminating this potential burst of placing multiple slow-to-process requests in one Netty server thread task queue.
Since the current patch that's merged in Spark uses `spark.shuffle.server.chunkFetchHandlerThreadsPercent` to configure the number of chunk fetch handler threads and it rounds up the number, it is very tricky to get the number of these thread pools configured right. In addition, for people who are not aware of this issue, they will very likely to fall into this trap and start seeing the RPC request timeout issue during shuffle fetch when the Spark workloads in their environment get to a certain scale. For these reasons, we propose to change the configurations of the number of threads for both thread pools, such that if people choose to use the dedicated chunk fetch handler, the number of Netty server threads would always be a multiple of the number of chunk fetch handler threads.
Attachments
Issue Links
- is related to
-
SPARK-24355 Improve Spark shuffle server responsiveness to non-ChunkFetch requests
- Resolved
- relates to
-
SPARK-30512 Use a dedicated boss event group loop in the netty pipeline for external shuffle service
- Resolved
- links to