Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-28131

FLIP-168: Speculative Execution for Batch Job

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Done
    • None
    • 1.16.0
    • Runtime / Coordination
    • None
    • Hide
      Speculative execution(FLIP-168) is introduced in Flink 1.16 to mitigate batch job slowness which is caused by problematic nodes. A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may make the hosted tasks run much slower than tasks on other nodes, and affect the overall execution time of a batch job.

      When speculative execution is enabled, Flink will keep detecting slow tasks. Once slow tasks are detected, the nodes that the slow tasks locate in will be identified as problematic nodes and get blocked via the blocklist mechanism(FLIP-224). The scheduler will create new attempts for the slow tasks and deploy them to nodes that are not blocked, while the existing attempts will keep running. The new attempts process the same input data and produce the same data as the original attempt. Once any attempt finishes first, it will be admitted as the only finished attempt of the task, and the remaining attempts of the task will be canceled.

      Most existing sources can work with speculative execution(FLIP-245). Only if a source uses SourceEvent, it must implement SupportsHandleExecutionAttemptSourceEvent to support speculative execution. Sinks do not support speculative execution yet so that speculative execution will not happen on sinks at the moment.

      The Web UI & REST API are also improved(FLIP-249) to display multiple concurrent attempts of tasks and blocked task managers.
      Show
      Speculative execution(FLIP-168) is introduced in Flink 1.16 to mitigate batch job slowness which is caused by problematic nodes. A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may make the hosted tasks run much slower than tasks on other nodes, and affect the overall execution time of a batch job. When speculative execution is enabled, Flink will keep detecting slow tasks. Once slow tasks are detected, the nodes that the slow tasks locate in will be identified as problematic nodes and get blocked via the blocklist mechanism(FLIP-224). The scheduler will create new attempts for the slow tasks and deploy them to nodes that are not blocked, while the existing attempts will keep running. The new attempts process the same input data and produce the same data as the original attempt. Once any attempt finishes first, it will be admitted as the only finished attempt of the task, and the remaining attempts of the task will be canceled. Most existing sources can work with speculative execution(FLIP-245). Only if a source uses SourceEvent, it must implement SupportsHandleExecutionAttemptSourceEvent to support speculative execution. Sinks do not support speculative execution yet so that speculative execution will not happen on sinks at the moment. The Web UI & REST API are also improved(FLIP-249) to display multiple concurrent attempts of tasks and blocked task managers.

    Description

      Speculative executions is helpful to mitigate slow tasks caused by problematic nodes. The basic idea is to start mirror tasks on other nodes when a slow task is detected. The mirror task processes the same input data and produces the same data as the original task. 

      More detailed can be found in [FLIP-168|https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job.]

       

      This is the umbrella ticket to track all the changes of this feature.

      Attachments

        Issue Links

          There are no Sub-Tasks for this issue.

          Activity

            People

              zhuzh Zhu Zhu
              zhuzh Zhu Zhu
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: