Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5940

ZooKeeperCompletedCheckpointStore cannot handle broken state handles

    Details

      Description

      The ZooKeeperCompletedCheckpointStore reads a set of RetrievableStateHandles from ZooKeeper upon recovery. It then tries to retrieve the CompletedCheckpoint from the latest state handle. If the retrieve operation fails, then the whole recovery of completed checkpoints fails even though the store might have read older state handles from ZooKeeper.

      I propose to harden the behaviour by removing broken state handles and returning the first successfully retrieved CompletedCheckpoint.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3446

          FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method

          Before ZooKeeperCompletedCheckpointStore only tried to recover the latest completed
          checkpoint even though it might have read older checkpoint state handles from
          ZooKeeper. In order to deal with corrupted state handles, this commit changes the
          behaviour such that the completed checkpoint store adds all read retrievable
          state handles from ZooKeeper and upon request of the latest checkpoint it will
          return the latest completed checkpoint which could be retrieved from the state
          handles. Broken state handles are removed from the completed checkpoint store and
          ZooKeeper.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixCheckpointRecovery

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3446.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3446


          commit e8fe16004562c012f15f2f1efc0ad45157fbb9dd
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-03-01T13:08:35Z

          FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method

          The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed
          checkpoint even though it might have read older checkpoint state handles from
          ZooKeeper. In order to deal with corrupted state handles, this commit changes the
          behaviour such that the completed checkpoint store adds all read retrievable
          state handles from ZooKeeper and upon request of the latest checkpoint it will
          return the latest completed checkpoint which could be retrieved from the state
          handles. Broken state handles are removed from the completed checkpoint store and
          ZooKeeper.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3446 FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method Before ZooKeeperCompletedCheckpointStore only tried to recover the latest completed checkpoint even though it might have read older checkpoint state handles from ZooKeeper. In order to deal with corrupted state handles, this commit changes the behaviour such that the completed checkpoint store adds all read retrievable state handles from ZooKeeper and upon request of the latest checkpoint it will return the latest completed checkpoint which could be retrieved from the state handles. Broken state handles are removed from the completed checkpoint store and ZooKeeper. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCheckpointRecovery Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3446.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3446 commit e8fe16004562c012f15f2f1efc0ad45157fbb9dd Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-03-01T13:08:35Z FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed checkpoint even though it might have read older checkpoint state handles from ZooKeeper. In order to deal with corrupted state handles, this commit changes the behaviour such that the completed checkpoint store adds all read retrievable state handles from ZooKeeper and upon request of the latest checkpoint it will return the latest completed checkpoint which could be retrieved from the state handles. Broken state handles are removed from the completed checkpoint store and ZooKeeper.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3448

          [backport-1.2] FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method

          Backport of #3446 onto `release-1.2` branch.

          The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed
          checkpoint even though it might have read older checkpoint state handles from
          ZooKeeper. In order to deal with corrupted state handles, this commit changes the
          behaviour such that the completed checkpoint store adds all read retrievable
          state handles from ZooKeeper and upon request of the latest checkpoint it will
          return the latest completed checkpoint which could be retrieved from the state
          handles. Broken state handles are removed from the completed checkpoint store and
          ZooKeeper.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixCheckpointRecoveryBp1.2

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3448.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3448


          commit f73f5d0bb1491cefa59d1e58ebc8d6a24a4cbaa1
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-03-01T13:08:35Z

          FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method

          The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed
          checkpoint even though it might have read older checkpoint state handles from
          ZooKeeper. In order to deal with corrupted state handles, this commit changes the
          behaviour such that the completed checkpoint store adds all read retrievable
          state handles from ZooKeeper and upon request of the latest checkpoint it will
          return the latest completed checkpoint which could be retrieved from the state
          handles. Broken state handles are removed from the completed checkpoint store and
          ZooKeeper.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3448 [backport-1.2] FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method Backport of #3446 onto `release-1.2` branch. The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed checkpoint even though it might have read older checkpoint state handles from ZooKeeper. In order to deal with corrupted state handles, this commit changes the behaviour such that the completed checkpoint store adds all read retrievable state handles from ZooKeeper and upon request of the latest checkpoint it will return the latest completed checkpoint which could be retrieved from the state handles. Broken state handles are removed from the completed checkpoint store and ZooKeeper. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCheckpointRecoveryBp1.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3448.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3448 commit f73f5d0bb1491cefa59d1e58ebc8d6a24a4cbaa1 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-03-01T13:08:35Z FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed checkpoint even though it might have read older checkpoint state handles from ZooKeeper. In order to deal with corrupted state handles, this commit changes the behaviour such that the completed checkpoint store adds all read retrievable state handles from ZooKeeper and upon request of the latest checkpoint it will return the latest completed checkpoint which could be retrieved from the state handles. Broken state handles are removed from the completed checkpoint store and ZooKeeper.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3451

          [backport-1.1] FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method

          Backpor of #3446 onto `release-1.1` branch.

          The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed
          checkpoint even though it might have read older checkpoint state handles from
          ZooKeeper. In order to deal with corrupted state handles, this commit changes the
          behaviour such that the completed checkpoint store adds all read retrievable
          state handles from ZooKeeper and upon request of the latest checkpoint it will
          return the latest completed checkpoint which could be retrieved from the state
          handles. Broken state handles are removed from the completed checkpoint store and
          ZooKeeper.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixCheckpointRecoveryBp1.1

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3451.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3451



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3451 [backport-1.1] FLINK-5940 [checkpoint] Harden ZooKeeperCompletedCheckpointStore.recover method Backpor of #3446 onto `release-1.1` branch. The ZooKeeperCompletedCheckpointStore only tries to recover the latest completed checkpoint even though it might have read older checkpoint state handles from ZooKeeper. In order to deal with corrupted state handles, this commit changes the behaviour such that the completed checkpoint store adds all read retrievable state handles from ZooKeeper and upon request of the latest checkpoint it will return the latest completed checkpoint which could be retrieved from the state handles. Broken state handles are removed from the completed checkpoint store and ZooKeeper. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixCheckpointRecoveryBp1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3451.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3451
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3446#discussion_r105021165

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java —
          @@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception

          { return null; }

          else {

          • return checkpointStateHandles.getLast().f0.retrieveState();
            + while(!checkpointStateHandles.isEmpty()) {
            + Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast();
            +
            + try { + return retrieveCompletedCheckpoint(checkpointStateHandle); + }

            catch (FlinkException e) {

              • End diff –

          I would catch more than `FlinkException` here - after all, we want to fall back to earlier checkpoints in any error case, no?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3446#discussion_r105021165 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java — @@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception { return null; } else { return checkpointStateHandles.getLast().f0.retrieveState(); + while(!checkpointStateHandles.isEmpty()) { + Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast(); + + try { + return retrieveCompletedCheckpoint(checkpointStateHandle); + } catch (FlinkException e) { End diff – I would catch more than `FlinkException` here - after all, we want to fall back to earlier checkpoints in any error case, no?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3451

          Fix looks good, but I have a similar comment about the test as for #3446

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3451 Fix looks good, but I have a similar comment about the test as for #3446
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3446#discussion_r105417543

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java —
          @@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception

          { return null; }

          else {

          • return checkpointStateHandles.getLast().f0.retrieveState();
            + while(!checkpointStateHandles.isEmpty()) {
            + Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast();
            +
            + try { + return retrieveCompletedCheckpoint(checkpointStateHandle); + }

            catch (FlinkException e) {

              • End diff –

          Yes, you're right. Will change it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3446#discussion_r105417543 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java — @@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception { return null; } else { return checkpointStateHandles.getLast().f0.retrieveState(); + while(!checkpointStateHandles.isEmpty()) { + Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast(); + + try { + return retrieveCompletedCheckpoint(checkpointStateHandle); + } catch (FlinkException e) { End diff – Yes, you're right. Will change it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3446#discussion_r105419062

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java —
          @@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception

          { return null; }

          else {

          • return checkpointStateHandles.getLast().f0.retrieveState();
            + while(!checkpointStateHandles.isEmpty()) {
            + Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast();
            +
            + try { + return retrieveCompletedCheckpoint(checkpointStateHandle); + }

            catch (FlinkException e) {

              • End diff –

          Technically, I think it was ok, because the `retrieveCompletedCheckpoint` method catches all `Exceptions` and wraps them in a `FlinkException`. But it's better to not rely on this implementation detail.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3446#discussion_r105419062 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java — @@ -226,16 +200,43 @@ public CompletedCheckpoint getLatestCheckpoint() throws Exception { return null; } else { return checkpointStateHandles.getLast().f0.retrieveState(); + while(!checkpointStateHandles.isEmpty()) { + Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast(); + + try { + return retrieveCompletedCheckpoint(checkpointStateHandle); + } catch (FlinkException e) { End diff – Technically, I think it was ok, because the `retrieveCompletedCheckpoint` method catches all `Exceptions` and wraps them in a `FlinkException`. But it's better to not rely on this implementation detail.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3446

          The mocking is indeed a little bit complex in this case. The problem is that you also want to check that a state handle which fails to retrieve its state, is properly discarded. This discard call happens as part of a callback to a ZooKeeper remove call. Since the curator client uses the builder API to construct the ZooKeeper calls, it was necessary to mock all the different build stages. I couldn't find a more succinct way to test this behaviour without starting a ZooKeeper server.

          If you think it hurts the test's maintainability too much, then I can start a ZooKeeper server for the test.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3446 The mocking is indeed a little bit complex in this case. The problem is that you also want to check that a state handle which fails to retrieve its state, is properly discarded. This discard call happens as part of a callback to a ZooKeeper remove call. Since the curator client uses the builder API to construct the ZooKeeper calls, it was necessary to mock all the different build stages. I couldn't find a more succinct way to test this behaviour without starting a ZooKeeper server. If you think it hurts the test's maintainability too much, then I can start a ZooKeeper server for the test.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3446

          Thanks for the review @StephanEwen. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3446 Thanks for the review @StephanEwen. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/3448

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3448
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3451

          Thanks for the review @StephanEwen. See comment on #3446 for the test. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3451 Thanks for the review @StephanEwen. See comment on #3446 for the test. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/3451

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3451
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.3.0: a00de2b4d959604ed1705b510580262101a790d6
          1.2.1: 67ec3b52effa8dab3b7e6ca6d89bb5ac4627f025
          1.1.5: a34559d20571aa8710e45c2a7cc917f3fe26707d

          Show
          till.rohrmann Till Rohrmann added a comment - 1.3.0: a00de2b4d959604ed1705b510580262101a790d6 1.2.1: 67ec3b52effa8dab3b7e6ca6d89bb5ac4627f025 1.1.5: a34559d20571aa8710e45c2a7cc917f3fe26707d
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/3446

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3446

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development