Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to restart when a task failure occurs. It contains two parts: firstly calculate the regions to restart; then add all the tasks in these regions to the restarting queue.
The bottleneck is mainly in the first part. This part traverses all the upstream and downstream regions of the failed region to determine whether they should be restarted or not.
For the current failed region, if its consumed result partition is not available, the owner, i.e., the upstream region should restart. Also, since the failed region needs to restart, its result partition won't be available, all the downstream regions need to restart, too.
1. Calculating the upstream regions that should restart
The current implementation is:
for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion: for each consumed SchedulingResultPartition of the SchedulingExecutionVertex: if the result partition is not available: add the producer region to the restart queue
Based on FLINK-21328, the consumed result partition of a vertex is already grouped. Here we can use a HashSet to record the visited result partition group. For vertices connected with all-to-all edges, they will only need to traverse the group once. This decreases the time complexity from O(N^2) to O(N).
2. Calculating the downstream regions that should restart
The current implementation is:
for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion: for each produced SchedulingResultPartition of the SchedulingExecutionVertex: for each consumer SchedulingExecutionVertex of the produced SchedulingResultPartition: if the region containing the consumer SchedulingExecutionVertex is not visited: add the region to the restart queue
Since the count of the produced result partitions of a vertex equals the count of output JobEdges, the time complexity of this procedure is actually O(N^2). As the consumer vertices of a result partition are already grouped, we can use a HashSet to record the visited ConsumerVertexGroup. The time complexity decreases from O(N^2) to O(N).
Attachments
Issue Links
- links to