Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4256 Fine-grained recovery
  3. FLINK-5869

ExecutionGraph use FailoverCoordinator to manage the failover of execution vertexes

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: JobManager
    • Labels:
      None

      Description

      Execution graph doesn't manage the failover of executions. It only care for the state of the whole job, which is CREATED, RUNNING, FAILED, FINISHED, or SUSPEND.
      For execution failure, it will notice the FailoverCoordinator to do failover.
      It only record the finished job vertex and changes to FINISHED after all vertexes finished.
      It will change to final fail if restart strategy fail or meet unrecoverable exceptions.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/3772

          FLINK-5869 [flip-1] Introduce abstraction for FailoverStrategy

          This PR has two sets of changes that I could not pull apart into separate pull requests.

          1. (1) Termination Futures

          Prior to this change, the `ExecutionGraph` decided when cancellation and finishing was complete by tracking how many `ExecutionJobVertex` were in a terminal state.

          This abstraction is too inflexible to track when subregions of the graph are in a terminal state. To fix that, this change introduces a termination future on the `Execution`. Building conjunct futures of the termination futures, any observer can track when any number of vertices in a terminal state.

          The `ExecutionGraph` now also uses that model to track when cancellation of all vertices during failover is complete.

          1. Local Failover and FailoverStrategy

          The `ExecutionGraph` now supports local failover and global failover. Quoting from the JavaDocs:

          • *Global failover* aborts the task executions for all vertices and restarts whole data flow graph from the last completed checkpoint. Global failover is considered the fallback strategy that is used when a local failover is unsuccessful, or when a issue is found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
          • *Local failover* is triggered when an individual vertex execution (a task) fails. The local failover is coordinated by the `FailoverStrategy`. A local failover typically attempts to restart as little as possible, but as much as necessary.
          • Between local- and global failover, the global failover always takes precedence, because it is the core mechanism that the `ExecutionGraph` relies on to bring back consistency. The guard that, the `ExecutionGraph` maintains a *global modification version*, which is incremented with every global failover (and other global actions, like job cancellation, or terminal failure). Local failover is always scoped by the modification version that the execution graph had when the failover was triggered. If a new global modification version is reached during local failover (meaning there is a concurrent global failover), the failover strategy has to yield before the global failover.
              1. Failover Strategies

          How exactly local failover happens is the concern of a pluggable `FailoverStrategy`.

          • The default failover strategy simply triggers a global failover
          • The pull request introduces a very simple restart individual failover strategy that restarts tasks without any connections to other tasks independently.
          1. Tests

          This pull requests adds new tests for

          • The termination future abstraction
          • The global mod version handling
          • Proper handling of concurrent local- and global failover

          The pull requests rewrites various original tests. This was necessary, because the tests were using Mockito very heavily and re-building or whitebox testing specific behavior that was affected by the changes.

          The changes to the tests introduce simple ways to actually bring up a functional ExecutionGraph and walk it through its state transitions. That way the tests now rely minimally on mocking and actually test the proper ExecutionGraph, rather than a mock which is expected to behave similar to the proper class.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink flip-1-basics

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3772.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3772


          commit ef7fd9964c1c74feb4641e57a138c54558b2449c
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-03-21T18:13:34Z

          FLINK-5869 [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph

          • Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery
          • Add base class for FailoverStrategy
          • Add default implementation (restart all tasks)
          • Add logic to load the failover strategy from the configuration

          commit c04a8a312098fddce14e392b8d9dbf396b1df3f3
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-03-29T20:49:54Z

          FLINK-6340 [flip-1] Add a termination future to the Execution


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3772 FLINK-5869 [flip-1] Introduce abstraction for FailoverStrategy This PR has two sets of changes that I could not pull apart into separate pull requests. (1) Termination Futures Prior to this change, the `ExecutionGraph` decided when cancellation and finishing was complete by tracking how many `ExecutionJobVertex` were in a terminal state. This abstraction is too inflexible to track when subregions of the graph are in a terminal state. To fix that, this change introduces a termination future on the `Execution`. Building conjunct futures of the termination futures, any observer can track when any number of vertices in a terminal state. The `ExecutionGraph` now also uses that model to track when cancellation of all vertices during failover is complete. Local Failover and FailoverStrategy The `ExecutionGraph` now supports local failover and global failover . Quoting from the JavaDocs: * Global failover * aborts the task executions for all vertices and restarts whole data flow graph from the last completed checkpoint. Global failover is considered the fallback strategy that is used when a local failover is unsuccessful, or when a issue is found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug). * Local failover * is triggered when an individual vertex execution (a task) fails. The local failover is coordinated by the `FailoverStrategy`. A local failover typically attempts to restart as little as possible, but as much as necessary. Between local- and global failover, the global failover always takes precedence, because it is the core mechanism that the `ExecutionGraph` relies on to bring back consistency. The guard that, the `ExecutionGraph` maintains a * global modification version *, which is incremented with every global failover (and other global actions, like job cancellation, or terminal failure). Local failover is always scoped by the modification version that the execution graph had when the failover was triggered. If a new global modification version is reached during local failover (meaning there is a concurrent global failover), the failover strategy has to yield before the global failover. Failover Strategies How exactly local failover happens is the concern of a pluggable `FailoverStrategy`. The default failover strategy simply triggers a global failover The pull request introduces a very simple restart individual failover strategy that restarts tasks without any connections to other tasks independently. Tests This pull requests adds new tests for The termination future abstraction The global mod version handling Proper handling of concurrent local- and global failover The pull requests rewrites various original tests. This was necessary, because the tests were using Mockito very heavily and re-building or whitebox testing specific behavior that was affected by the changes. The changes to the tests introduce simple ways to actually bring up a functional ExecutionGraph and walk it through its state transitions. That way the tests now rely minimally on mocking and actually test the proper ExecutionGraph, rather than a mock which is expected to behave similar to the proper class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink flip-1-basics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3772.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3772 commit ef7fd9964c1c74feb4641e57a138c54558b2449c Author: Stephan Ewen <sewen@apache.org> Date: 2017-03-21T18:13:34Z FLINK-5869 [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery Add base class for FailoverStrategy Add default implementation (restart all tasks) Add logic to load the failover strategy from the configuration commit c04a8a312098fddce14e392b8d9dbf396b1df3f3 Author: Stephan Ewen <sewen@apache.org> Date: 2017-03-29T20:49:54Z FLINK-6340 [flip-1] Add a termination future to the Execution
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3772#discussion_r113272506

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java —
          @@ -72,6 +72,13 @@
          .defaultValue(16)
          .withDeprecatedKeys("job-manager.max-attempts-history-size");

          + /**
          + * The maximum number of prior execution attempts kept in history.
          + */
          + public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY =
          + key("jobmanager.execution.failover-strategy")
          + .defaultValue("full");
          — End diff –

          have we ever considered defining a set of valid values directly in the config option?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3772#discussion_r113272506 — Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java — @@ -72,6 +72,13 @@ .defaultValue(16) .withDeprecatedKeys("job-manager.max-attempts-history-size"); + /** + * The maximum number of prior execution attempts kept in history. + */ + public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY = + key("jobmanager.execution.failover-strategy") + .defaultValue("full"); — End diff – have we ever considered defining a set of valid values directly in the config option?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3772#discussion_r113399653

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java —
          @@ -72,6 +72,13 @@
          .defaultValue(16)
          .withDeprecatedKeys("job-manager.max-attempts-history-size");

          + /**
          + * The maximum number of prior execution attempts kept in history.
          + */
          + public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY =
          + key("jobmanager.execution.failover-strategy")
          + .defaultValue("full");
          — End diff –

          I don't think we have done that yet, no...

          There is a PR to extend the config options to include a description (and generate docs), it could be an extension there.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3772#discussion_r113399653 — Diff: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java — @@ -72,6 +72,13 @@ .defaultValue(16) .withDeprecatedKeys("job-manager.max-attempts-history-size"); + /** + * The maximum number of prior execution attempts kept in history. + */ + public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY = + key("jobmanager.execution.failover-strategy") + .defaultValue("full"); — End diff – I don't think we have done that yet, no... There is a PR to extend the config options to include a description (and generate docs), it could be an extension there.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed via 8ed85fe49b7595546a8f968e0faa1fa7d4da47ec

          Show
          StephanEwen Stephan Ewen added a comment - Fixed via 8ed85fe49b7595546a8f968e0faa1fa7d4da47ec
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3772

          Manually merged in 8ed85fe49b7595546a8f968e0faa1fa7d4da47ec

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3772 Manually merged in 8ed85fe49b7595546a8f968e0faa1fa7d4da47ec
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen closed the pull request at:

          https://github.com/apache/flink/pull/3772

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen closed the pull request at: https://github.com/apache/flink/pull/3772

            People

            • Assignee:
              tiemsn shuai.xu
              Reporter:
              tiemsn shuai.xu
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development