For the large scale jobs, there will be too many TCP connections among TaskManagers. Let's take an example.
For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 slots. Thus there will be 400 taskmanagers in this job. Let's assume that job runs on a cluster with 20 machines.
If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 303,240 TCP connections for each machine. If we run several jobs on this cluster, the TCP connections may exceed the maximum limit of linux, which is 1,048,576. This will stop the TaskManagers from creating new TCP connections and cause task failovers.
As we run our production jobs on a K8S cluster, the job always failover due to exceptions related to network, such as Sending the partition request to 'null' failed, and etc.
We think that we can decrease the number of connections by letting tasks reuse the same connection. We implemented a POC that makes all tasks on the same TaskManager reuse one TCP connection. For the example job we mentioned above, the number of connections will decrease from 303,240 to 15960. With the POC, the frequency of meeting exceptions related to network in our production jobs drops significantly.
The POC is illustrated in: https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc