The performance of scheduler has been improved to reduce the time of execution graph creation, task deployment and task failover. This improvement is significant to large scale jobs which currently may spend minutes on the processes mentioned above. This improvement also helps to avoid cases that the job manager main thread gets blocked for too long and leads to heartbeat timeout.The performance of scheduler has been improved to reduce the time of execution graph creation, task deployment and task failover. This improvement is significant to large scale jobs which currently may spend minutes on the processes mentioned above. This improvement also helps to avoid cases that the job manager main thread gets blocked for too long and leads to heartbeat timeout.
In the phase 1, we mainly focus on the optimization of job initialization, failover and partitions releasing.
The main idea is to replace ExecutionEdges with EdgeManager. In EdgeManager, we put all the vertices that consumed the same result partitions into one group, and put all the result partitions that have the same consumer vertices into one consumer group. This will effectively decrease the computation complexity that involves iteration over ExecutionEdges.
The phase 1 involves the subtask 1-5.
According to the result of scheduler benchmarks we implemented in
FLINK-20612, the bottleneck of deploying and running a large-scale job in Flink is mainly focused on the following procedures:
|Scheduling downstream tasks when a task finishes||O(N^2)|
|Calculating tasks to restart when a failover occurs||O(N^2)|
|Releasing result partitions||O(N^3)|
These procedures are all related to the complexity of the topology in the ExecutionGraph. Between two vertices connected with the all-to-all edges, all the upstream Intermediate ResultPartitions are connected to all downstream ExecutionVertices. The computation complexity of building and traversing all these edges will be O(N^2).
As for memory usage, currently we use ExecutionEdges to store the information of connections. For the all-to-all distribution type, there are O(N^2) ExecutionEdges. We test a simple job with only two vertices. The parallelisms of them are both 10k. Furthermore, they are connected with all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.
In most large-scale jobs, there will be more than two vertices with large parallelisms, and they would cost a lot of time and memory to deploy the job.
As we can see, for two JobVertices connected with the all-to-all distribution type, all IntermediateResultPartitions produced by the upstream ExecutionVertices are isomorphic, which means that the downstream ExecutionVertices they connected are exactly the same. The downstream ExecutionVertices belonging to the same JobVertex are also isomorphic, as the upstream ResultPartitions they connect are the same, too.
Since every JobEdge has exactly one distribution type, we can divide the vertices and result partitions into groups according to the distribution type of the JobEdge.
For the all-to-all distribution type, since all downstream vertices are isomorphic, they belong to a single group, and all the upstream result partitions are connected to this group. Vice versa, all the upstream result partitions also belong to a single group, and all the downstream vertices are connected to this group. In the past, when we wanted to iterate all the downstream vertices, we needed to loop over them n times, which leads to the complexity of O(N^2). Now since all upstream result partitions are connected to one downstream group, we just need to loop over them once, with the complexity of O(N).
For the pointwise distribution type, because each result partition is connected to different downstream vertices, they should belong to different groups. Vice versa, all the vertices belong to different groups. Since one result partition group is connected to one vertex group pointwisely, the computation complexity of looping over them is still O(N).
After we group the result partitions and vertices, ExecutionEdge is no longer needed. For the test job we mentioned above, the optimization can effectively reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) in our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds (with 10k parallelism).
The detailed design doc: https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
In the phase 2, we mainly focus on the optimization of deploying tasks.
The main idea is to cache the ShuffleDescriptors used in the deployment so that JobManager wouldn't need to calculate them for each vertex. This optimization will effectively decrease the time cost on task deployment, especially for large-scale jobs with all-to-all edges. For the overall description of the phase 2, please refer to
The phase 2 involves the subtask 10-12.
In the phase 3, we mainly focus on the initialization of the DefaultScheduler.
The phase 3 involves optimizing the construction of pipelined regions and optimizing the initialization of LocalInputPreferredSlotSharingStrategy. After the optimizations, the DefaultSchdeuler of the job containing two vertices with 8k parallelism, connected with all-to-all edge, can be initialized within one second. This optimization significantly accelerates the submission of new jobs, especially for OLAP jobs.
The phase 3 involves the subtask 8 and 9.