Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6328

Savepoints must not be counted as retained checkpoints

    Details

      Description

      The Checkpoint Store retains the n latest checkpoints.
      Savepoints are counted as well, meaning that for settings with 1 retained checkpoint, there are sometimes no retained checkpoints at all, only a savepoint.

      That is dangerous, because savepoints must be assumed to disappear at any point in time - their lifecycle is out of control of the CheckpointCoordinator.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          Stephan Ewen Have you started working on this? If not, do you have an idea about the complexity?

          Show
          aljoscha Aljoscha Krettek added a comment - Stephan Ewen Have you started working on this? If not, do you have an idea about the complexity?
          Hide
          rmetzger Robert Metzger added a comment -

          Stephan Ewen is this a blocker for the 1.3 release?

          Show
          rmetzger Robert Metzger added a comment - Stephan Ewen is this a blocker for the 1.3 release?
          Hide
          tonywei Wei-Che Wei added a comment -

          Hi all,

          Are this and FLINK-6071 the similar issues?

          Show
          tonywei Wei-Che Wei added a comment - Hi all, Are this and FLINK-6071 the similar issues?
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Wei-Che Wei yes FLINK-6071 is a duplicate.

          Show
          till.rohrmann Till Rohrmann added a comment - Wei-Che Wei yes FLINK-6071 is a duplicate.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Given that the lifecycle of a savepoint is out of control of the CheckpointCoordinator, I think it is best to not add savepoints to the CompletedCheckpointStore and, thus, not considering them for job recovery. The reason for this is FLINK-4815, because otherwise a single broken/deleted savepoint will thwart Flink's whole recovery mechanism.

          Once FLINK-4815 has been added we might think again about re-adding savepoints to the CompletedCheckpointStore and, thus, allowing to recover from savepoints in case of failures. When doing so, we should, however, not count the savepoints for the number of retained checkpoints, because we cannot be sure that they still exist.

          Show
          till.rohrmann Till Rohrmann added a comment - Given that the lifecycle of a savepoint is out of control of the CheckpointCoordinator , I think it is best to not add savepoints to the CompletedCheckpointStore and, thus, not considering them for job recovery. The reason for this is FLINK-4815 , because otherwise a single broken/deleted savepoint will thwart Flink's whole recovery mechanism. Once FLINK-4815 has been added we might think again about re-adding savepoints to the CompletedCheckpointStore and, thus, allowing to recover from savepoints in case of failures. When doing so, we should, however, not count the savepoints for the number of retained checkpoints, because we cannot be sure that they still exist.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3965

          FLINK-6328 [chkPts] Don't add savepoints to CompletedCheckpointStore

          The lifecycle of savepoints is not managed by the CheckpointCoordinator and fully
          in the hand of the user. Therefore, the CheckpointCoordinator cannot rely on them
          when trying to recover from failures. E.g. a user moving a savepoint shortly before
          a failure could completely break Flink's recovery mechanism because Flink cannot
          skip failed checkpoints when recovering.

          Therefore, until Flink is able to skip failed checkpoints when recovering, we should
          not add savepoints to the CompletedCheckpointStore which is used to retrieve checkpoint
          for recovery. The distinction of a savepoint is done on the basis of the
          CheckpointProperties (CheckpointProperties.STANDARD_SAVEPOINT).

          cc @rmetzger

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixSavepointHandling

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3965.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3965


          commit 9c069ad80d66f03a0f90c8ba1a780cbba111896e
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-22T15:41:14Z

          FLINK-6328 [chkPts] Don't add savepoints to CompletedCheckpointStore

          The lifecycle of savepoints is not managed by the CheckpointCoordinator and fully
          in the hand of the user. Therefore, the CheckpointCoordinator cannot rely on them
          when trying to recover from failures. E.g. a user moving a savepoint shortly before
          a failure could completely break Flink's recovery mechanism because Flink cannot
          skip failed checkpoints when recovering.

          Therefore, until Flink is able to skip failed checkpoints when recovering, we should
          not add savepoints to the CompletedCheckpointStore which is used to retrieve checkpoint
          for recovery. The distinction of a savepoint is done on the basis of the
          CheckpointProperties (CheckpointProperties.STANDARD_SAVEPOINT).


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3965 FLINK-6328 [chkPts] Don't add savepoints to CompletedCheckpointStore The lifecycle of savepoints is not managed by the CheckpointCoordinator and fully in the hand of the user. Therefore, the CheckpointCoordinator cannot rely on them when trying to recover from failures. E.g. a user moving a savepoint shortly before a failure could completely break Flink's recovery mechanism because Flink cannot skip failed checkpoints when recovering. Therefore, until Flink is able to skip failed checkpoints when recovering, we should not add savepoints to the CompletedCheckpointStore which is used to retrieve checkpoint for recovery. The distinction of a savepoint is done on the basis of the CheckpointProperties (CheckpointProperties.STANDARD_SAVEPOINT). cc @rmetzger You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixSavepointHandling Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3965.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3965 commit 9c069ad80d66f03a0f90c8ba1a780cbba111896e Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-22T15:41:14Z FLINK-6328 [chkPts] Don't add savepoints to CompletedCheckpointStore The lifecycle of savepoints is not managed by the CheckpointCoordinator and fully in the hand of the user. Therefore, the CheckpointCoordinator cannot rely on them when trying to recover from failures. E.g. a user moving a savepoint shortly before a failure could completely break Flink's recovery mechanism because Flink cannot skip failed checkpoints when recovering. Therefore, until Flink is able to skip failed checkpoints when recovering, we should not add savepoints to the CompletedCheckpointStore which is used to retrieve checkpoint for recovery. The distinction of a savepoint is done on the basis of the CheckpointProperties (CheckpointProperties.STANDARD_SAVEPOINT).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tony810430 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3965#discussion_r117902054

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java —
          @@ -864,22 +864,28 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro
          // the pending checkpoint must be discarded after the finalization
          Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);

          • try { - completedCheckpointStore.addCheckpoint(completedCheckpoint); - }

            catch (Exception exception) {

          • // we failed to store the completed checkpoint. Let's clean up
          • executor.execute(new Runnable() {
          • @Override
          • public void run() {
          • try { - completedCheckpoint.discardOnFailedStoring(); - }

            catch (Throwable t) {

          • LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
            + // TODO: add savepoints to completed checkpoint store once FLINK-4815 has been completed
            + if (!CheckpointProperties.isSavepoint(completedCheckpoint.getProperties())) {
              • End diff –

          Why not use `completedCheckpoint.getProperties().isSavepoint()` to check whether it is a savepoint or not?
          This method had already been implemented in `CheckpoinProperties`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tony810430 commented on a diff in the pull request: https://github.com/apache/flink/pull/3965#discussion_r117902054 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java — @@ -864,22 +864,28 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro // the pending checkpoint must be discarded after the finalization Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null); try { - completedCheckpointStore.addCheckpoint(completedCheckpoint); - } catch (Exception exception) { // we failed to store the completed checkpoint. Let's clean up executor.execute(new Runnable() { @Override public void run() { try { - completedCheckpoint.discardOnFailedStoring(); - } catch (Throwable t) { LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t); + // TODO: add savepoints to completed checkpoint store once FLINK-4815 has been completed + if (!CheckpointProperties.isSavepoint(completedCheckpoint.getProperties())) { End diff – Why not use `completedCheckpoint.getProperties().isSavepoint()` to check whether it is a savepoint or not? This method had already been implemented in `CheckpoinProperties`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3965#discussion_r117906785

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java —
          @@ -864,22 +864,28 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro
          // the pending checkpoint must be discarded after the finalization
          Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);

          • try { - completedCheckpointStore.addCheckpoint(completedCheckpoint); - }

            catch (Exception exception) {

          • // we failed to store the completed checkpoint. Let's clean up
          • executor.execute(new Runnable() {
          • @Override
          • public void run() {
          • try { - completedCheckpoint.discardOnFailedStoring(); - }

            catch (Throwable t) {

          • LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
            + // TODO: add savepoints to completed checkpoint store once FLINK-4815 has been completed
            + if (!CheckpointProperties.isSavepoint(completedCheckpoint.getProperties())) {
              • End diff –

          Good point. I missed that. Will adapt the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3965#discussion_r117906785 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java — @@ -864,22 +864,28 @@ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) thro // the pending checkpoint must be discarded after the finalization Preconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null); try { - completedCheckpointStore.addCheckpoint(completedCheckpoint); - } catch (Exception exception) { // we failed to store the completed checkpoint. Let's clean up executor.execute(new Runnable() { @Override public void run() { try { - completedCheckpoint.discardOnFailedStoring(); - } catch (Throwable t) { LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t); + // TODO: add savepoints to completed checkpoint store once FLINK-4815 has been completed + if (!CheckpointProperties.isSavepoint(completedCheckpoint.getProperties())) { End diff – Good point. I missed that. Will adapt the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user rmetzger commented on the issue:

          https://github.com/apache/flink/pull/3965

          +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3965 +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3965

          Thanks for the review @tony810430 and @rmetzger. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3965 Thanks for the review @tony810430 and @rmetzger. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3965

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3965
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.4.0: 6f570e7b6810e1645a4f7094f17ab9e8559fa139
          1.3.0: 39114e26bdc8e69e04478d1ff7596860f289a013
          1.2.2: cc1ed221fd3ab5b535983a4c1c94bbdb93d71309

          Show
          till.rohrmann Till Rohrmann added a comment - 1.4.0: 6f570e7b6810e1645a4f7094f17ab9e8559fa139 1.3.0: 39114e26bdc8e69e04478d1ff7596860f289a013 1.2.2: cc1ed221fd3ab5b535983a4c1c94bbdb93d71309

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development