Details

    • Type: Improvement
    • Status: In Progress
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.1.0
    • Fix Version/s: None
    • Component/s: JobManager
    • Labels:
      None

      Description

      When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-execution from the last completed checkpoint. This is more expensive than just re-executing the failed tasks.

      In many cases, more fine-grained recovery is possible.

      The full description and design is in the corresponding FLIP.

      https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

      The detail desgin for version1 is https://docs.google.com/document/d/1_PqPLA1TJgjlqz8fqnVE3YSisYBDdFsrRX_URgRSj74/edit#

        Issue Links

          Activity

          Hide
          Dragon.L Wenlong Lyu added a comment - - edited

          hi, Stephan, we have implemented similar solution before, but simple back tracking and forward cannot work well in situation following:

          Assuming job graph has A/B/C job vertices, A is connected to C with forward strategy, and B is connected to C all-to-all strategy, when a task of A failed, only one C task will be added to restart node set.

          I suggesting to treat the job graph as an undirected graph and divide the job graph in maximal connected sub-graphs, when a job graph is submitted. when a task failover, restart the whole sub-graph related.

          Besides, when the job graph is large because extracting related nodes according to a given node can be time costly and will be repeatedly used in long running, using sub-graphs can avoid the problem

          Show
          Dragon.L Wenlong Lyu added a comment - - edited hi, Stephan, we have implemented similar solution before, but simple back tracking and forward cannot work well in situation following: Assuming job graph has A/B/C job vertices, A is connected to C with forward strategy, and B is connected to C all-to-all strategy, when a task of A failed, only one C task will be added to restart node set. I suggesting to treat the job graph as an undirected graph and divide the job graph in maximal connected sub-graphs, when a job graph is submitted. when a task failover, restart the whole sub-graph related. Besides, when the job graph is large because extracting related nodes according to a given node can be time costly and will be repeatedly used in long running, using sub-graphs can avoid the problem
          Hide
          zjwang zhijiang added a comment -

          In further improvement, if task c failed, the following downstream tasks like d and e should not be restarted. We already make some works related with it.
          I am interested in the issue of caching intermediate result, it can solve the problem of restarting upstream tasks of failed one. Is it the PIPELINED_PERSISTENT type in result partition? Wish further plan for it.

          Show
          zjwang zhijiang added a comment - In further improvement, if task c failed, the following downstream tasks like d and e should not be restarted. We already make some works related with it. I am interested in the issue of caching intermediate result, it can solve the problem of restarting upstream tasks of failed one. Is it the PIPELINED_PERSISTENT type in result partition? Wish further plan for it.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Wenlong Lyu True, one has to find the entire "connected component" for restart. That one is, however, dynamic, so I would not pre-compute it:

          • We may introduce best-effort caching that means in some cases the program must backtrack further, in others less
          • Downstream canceling is only necessary if an input has already been supplied to the downstream task. Especially in batch, this is often not the case and can reduce the tasks to look at for canceling.

          We can make this quite a bit more efficient in my opinion by operating on ExecutionJobVertex level for many cases, rather than on each individual vertex.

          Show
          StephanEwen Stephan Ewen added a comment - Wenlong Lyu True, one has to find the entire "connected component" for restart. That one is, however, dynamic, so I would not pre-compute it: We may introduce best-effort caching that means in some cases the program must backtrack further, in others less Downstream canceling is only necessary if an input has already been supplied to the downstream task. Especially in batch, this is often not the case and can reduce the tasks to look at for canceling. We can make this quite a bit more efficient in my opinion by operating on ExecutionJobVertex level for many cases, rather than on each individual vertex.
          Hide
          StephanEwen Stephan Ewen added a comment -

          zhijiang Preventing downstream restarts would be a followup optimization.
          In order to not make this issue here more complicated than it already is, I would first solve this, and then approach this as a separate followup.

          Show
          StephanEwen Stephan Ewen added a comment - zhijiang Preventing downstream restarts would be a followup optimization. In order to not make this issue here more complicated than it already is, I would first solve this, and then approach this as a separate followup.
          Hide
          wenlong.lwl Wenlong Lyu added a comment -

          thanks for explaining, you are right about pre-computing. Still have another concern, I think it is quite a special case for a job to be ExecutionJobVertex level splittable, it may only happen in batch job graphs with blocking edges in practice.

          Show
          wenlong.lwl Wenlong Lyu added a comment - thanks for explaining, you are right about pre-computing. Still have another concern, I think it is quite a special case for a job to be ExecutionJobVertex level splittable, it may only happen in batch job graphs with blocking edges in practice.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shuai-xu opened a pull request:

          https://github.com/apache/flink/pull/3539

          FLINK-4256 Flip1: fine gained recovery

          This is an informal pr for the implementation of flip1 version 1.
          It enable that when a task fail, only restart the minimal pipelined connected executions instead of the whole execution graph.
          Main changes:
          1. ExecutionGraph doesn't manage the failover any more, it only record the finished JobVertex number and turn to FINISHED when all vertexes finish(maybe later FailoverCoordinator will take over this). Its state can only be CREATED, RUNNING, FAILED, FINISHED or SUSPENDED now.
          2. FailoverCoordinator will manage the failover now. It will generate several FailoverRegions when the EG is attached. It listens for the fail of executions. When an execution fail, it finds a FailoverRegion to finish the failover.
          3. When JM need the EG to be canceled or failed, EG will also notice FailoverCoordinator, FailoverCoordinator will notice all FailoverRegions to cancel their executions and when all executions are canceled, FailoverCoordinator will notice EG to be CANCELED or FAILED.
          4. FailoverCoordinator has server state, RUNNING, FAILING, CANCELLING, FAILED, CANCELED.
          5. FailoverRegion contains the minimal pipelined connected executions and manager the failover of them.
          6. FailoverRegion has CREATED, RUNNING, CANCELLING, CANCELLED.
          7. One FailoverRegion may be the succeeding or preceding of others. When a preceding region failover, its all succeedings should failover too. And the succeedings should just reset its executions and wait for the preceding to start it when preceding finish. Preceding should wait for its succeedings to be CREATED and then schedule again.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shuai-xu/flink jira-4256

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3539.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3539


          commit 363f1536838064edbdd5f39e41f3f19f6c511fc4
          Author: shuai.xus <shuai.xus@alibaba-inc.com>
          Date: 2017-03-15T03:36:11Z

          FLINK-4256 Flip1: fine gained recovery


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/3539 FLINK-4256 Flip1: fine gained recovery This is an informal pr for the implementation of flip1 version 1. It enable that when a task fail, only restart the minimal pipelined connected executions instead of the whole execution graph. Main changes: 1. ExecutionGraph doesn't manage the failover any more, it only record the finished JobVertex number and turn to FINISHED when all vertexes finish(maybe later FailoverCoordinator will take over this). Its state can only be CREATED, RUNNING, FAILED, FINISHED or SUSPENDED now. 2. FailoverCoordinator will manage the failover now. It will generate several FailoverRegions when the EG is attached. It listens for the fail of executions. When an execution fail, it finds a FailoverRegion to finish the failover. 3. When JM need the EG to be canceled or failed, EG will also notice FailoverCoordinator, FailoverCoordinator will notice all FailoverRegions to cancel their executions and when all executions are canceled, FailoverCoordinator will notice EG to be CANCELED or FAILED. 4. FailoverCoordinator has server state, RUNNING, FAILING, CANCELLING, FAILED, CANCELED. 5. FailoverRegion contains the minimal pipelined connected executions and manager the failover of them. 6. FailoverRegion has CREATED, RUNNING, CANCELLING, CANCELLED. 7. One FailoverRegion may be the succeeding or preceding of others. When a preceding region failover, its all succeedings should failover too. And the succeedings should just reset its executions and wait for the preceding to start it when preceding finish. Preceding should wait for its succeedings to be CREATED and then schedule again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-4256 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3539.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3539 commit 363f1536838064edbdd5f39e41f3f19f6c511fc4 Author: shuai.xus <shuai.xus@alibaba-inc.com> Date: 2017-03-15T03:36:11Z FLINK-4256 Flip1: fine gained recovery
          Hide
          eronwright Eron Wright added a comment -

          Is the scope of FLINK-4256 covering batch jobs only? There are various TODOs related to the interplay between local failover and checkpointing, for example. Wondering whether additional subtasks should be opened here or elsewhere.

          Show
          eronwright Eron Wright added a comment - Is the scope of FLINK-4256 covering batch jobs only? There are various TODOs related to the interplay between local failover and checkpointing, for example. Wondering whether additional subtasks should be opened here or elsewhere.
          Hide
          sihuazhou Sihua Zhou added a comment -

          Partial recovery seem to more useful for Batch job, for stream job is it also fine to place `local recovery` under this umbrella? Stephan Ewen

          Show
          sihuazhou Sihua Zhou added a comment - Partial recovery seem to more useful for Batch job, for stream job is it also fine to place `local recovery` under this umbrella? Stephan Ewen

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              23 Start watching this issue

              Dates

              • Created:
                Updated:

                Development