Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21110 Optimize scheduler performance for large-scale jobs
  3. FLINK-21331

Optimize calculating tasks to restart in RestartPipelinedRegionFailoverStrategy

    XMLWordPrintableJSON

Details

    Description

      RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to restart when a task failure occurs. It contains two parts: firstly calculate the regions to restart; then add all the tasks in these regions to the restarting queue.

      The bottleneck is mainly in the first part. This part traverses all the upstream and downstream regions of the failed region to determine whether they should be restarted or not.

      For the current failed region, if its consumed result partition is not available, the owner, i.e., the upstream region should restart. Also, since the failed region needs to restart, its result partition won't be available, all the downstream regions need to restart, too.

      1. Calculating the upstream regions that should restart

      The current implementation is:

      for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion:
        for each consumed SchedulingResultPartition of the SchedulingExecutionVertex:
          if the result partition is not available:
            add the producer region to the restart queue
      

      Based on FLINK-21328, the consumed result partition of a vertex is already grouped. Here we can use a HashSet to record the visited result partition group. For vertices connected with all-to-all edges, they will only need to traverse the group once. This decreases the time complexity from O(N^2) to O(N).

      2. Calculating the downstream regions that should restart

      The current implementation is:

      for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion:
        for each produced SchedulingResultPartition of the SchedulingExecutionVertex:
          for each consumer SchedulingExecutionVertex of the produced SchedulingResultPartition:
            if the region containing the consumer SchedulingExecutionVertex is not visited:
              add the region to the restart queue
      

      Since the count of the produced result partitions of a vertex equals the count of output JobEdges, the time complexity of this procedure is actually O(N^2). As the consumer vertices of a result partition are already grouped, we can use a HashSet to record the visited ConsumerVertexGroup. The time complexity decreases from O(N^2) to O(N).

      Attachments

        Issue Links

          Activity

            People

              Thesharing Zhilong Hong
              Thesharing Zhilong Hong
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: