Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5667

Possible state data loss when task fails while checkpointing

    Details

      Description

      It is possible that Flink loses state data when a Task fails while a checkpoint is being drawn. The scenario is the following:

      Flink has finished the synchronous checkpointing part and starts the asynchronous part by creating and submitting a AsyncCheckpointRunnable to an Executor. This runnable is also registered at the closeable registry. If the Task now fails before the AsyncCheckpointRunnable has completed, it will be closed due to being registered in the closeable registry. The closing operation will discard all state handles and cancel all runnable state futures. However, it will not stop the runnable from sending an acknowledge message to the CheckpointCoordinator.

      If this message completes the pending checkpoint, then this checkpoint will be transformed into a CompletedCheckpoint which is faulty (some of the data has already been deleted). Depending on Flink's configuration, this will discard older completed checkpoints and thus we will have state data loss.

        Issue Links

          Activity

          Hide
          uce Ufuk Celebi added a comment - - edited

          Very good catch!

          Show
          uce Ufuk Celebi added a comment - - edited Very good catch!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3226

          FLINK-5667 [state] Synchronize asynchronous checkpointing and close operation

          This PR synchronizes asynchronous checkpointing and close operations of a StreamTask.
          The synchronization prevents that an acknowledged checkpoint gets discarded and that
          a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic
          state variable which guards against late close and acknowledge operations.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink asyncCheckpointingFix

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3226.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3226


          commit 16ce24f6d062ade2f3bb3d41059ced71b7040c52
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-01-27T15:26:22Z

          FLINK-5667 [state] Synchronize asynchronous checkpointing and close operation

          This PR synchronizes asynchronous checkpointing and close operations of a StreamTask.
          The synchronization prevents that an acknowledged checkpoint gets discarded and that
          a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic
          state variable which guards against late close and acknowledge operations.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3226 FLINK-5667 [state] Synchronize asynchronous checkpointing and close operation This PR synchronizes asynchronous checkpointing and close operations of a StreamTask. The synchronization prevents that an acknowledged checkpoint gets discarded and that a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic state variable which guards against late close and acknowledge operations. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink asyncCheckpointingFix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3226.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3226 commit 16ce24f6d062ade2f3bb3d41059ced71b7040c52 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-01-27T15:26:22Z FLINK-5667 [state] Synchronize asynchronous checkpointing and close operation This PR synchronizes asynchronous checkpointing and close operations of a StreamTask. The synchronization prevents that an acknowledged checkpoint gets discarded and that a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic state variable which guards against late close and acknowledge operations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3227

          [backport] FLINK-5667 [state] Synchronize asynchronous checkpointing and close operation

          Backport of #3226 onto `release-1.2` branch.

          This PR synchronizes asynchronous checkpointing and close operations of a StreamTask.
          The synchronization prevents that an acknowledged checkpoint gets discarded and that
          a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic
          state variable which guards against late close and acknowledge operations.

          cc @uce, @StefanRRichter

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink asyncCheckpointingFixBackport

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3227.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3227


          commit d0a2499f10060f159ad92500338125850a82c0c4
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-01-27T15:26:22Z

          FLINK-5667 [state] Synchronize asynchronous checkpointing and close operation

          This PR synchronizes asynchronous checkpointing and close operations of a StreamTask.
          The synchronization prevents that an acknowledged checkpoint gets discarded and that
          a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic
          state variable which guards against late close and acknowledge operations.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3227 [backport] FLINK-5667 [state] Synchronize asynchronous checkpointing and close operation Backport of #3226 onto `release-1.2` branch. This PR synchronizes asynchronous checkpointing and close operations of a StreamTask. The synchronization prevents that an acknowledged checkpoint gets discarded and that a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic state variable which guards against late close and acknowledge operations. cc @uce, @StefanRRichter You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink asyncCheckpointingFixBackport Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3227.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3227 commit d0a2499f10060f159ad92500338125850a82c0c4 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-01-27T15:26:22Z FLINK-5667 [state] Synchronize asynchronous checkpointing and close operation This PR synchronizes asynchronous checkpointing and close operations of a StreamTask. The synchronization prevents that an acknowledged checkpoint gets discarded and that a discarded checkpoint gets acknowledged. It achieves this by introducing an atomic state variable which guards against late close and acknowledge operations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3226

          Thanks! +1 to merge this and #3227. Verified that tests catch both orderings.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3226 Thanks! +1 to merge this and #3227. Verified that tests catch both orderings.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3227#discussion_r98248743

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java —
          @@ -947,11 +951,17 @@ public void run() {
          keyedStateHandleBackend,
          keyedStateHandleStream);

          • owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
            + if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
            + owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
              • End diff –

          I think there is still a problem here: in case the RPC that acknowledges the checkpoint fails with exception, the `cleanup()` call in the `catch{}` will find the state already set to completed and will not perform any cleanup actions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3227#discussion_r98248743 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — @@ -947,11 +951,17 @@ public void run() { keyedStateHandleBackend, keyedStateHandleStream); owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState); + if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState); End diff – I think there is still a problem here: in case the RPC that acknowledges the checkpoint fails with exception, the `cleanup()` call in the `catch{}` will find the state already set to completed and will not perform any cleanup actions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3227#discussion_r98268538

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java —
          @@ -947,11 +951,17 @@ public void run() {
          keyedStateHandleBackend,
          keyedStateHandleStream);

          • owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
            + if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
            + owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState);
              • End diff –

          In order to harden it, I'll reset the state to `RUNNING` in the failure case if it was `COMPLETED`. Then cleanup should properly work.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3227#discussion_r98268538 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — @@ -947,11 +951,17 @@ public void run() { keyedStateHandleBackend, keyedStateHandleStream); owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState); + if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { + owner.getEnvironment().acknowledgeCheckpoint(checkpointMetaData, subtaskState); End diff – In order to harden it, I'll reset the state to `RUNNING` in the failure case if it was `COMPLETED`. Then cleanup should properly work.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3226

          Thanks for the review @uce. Will address @StefanRRichter comment from #3227 and then merge the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3226 Thanks for the review @uce. Will address @StefanRRichter comment from #3227 and then merge the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3227

          Thanks for the review @StefanRRichter. I'll address your comment and then merge the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3227 Thanks for the review @StefanRRichter. I'll address your comment and then merge the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3226

          Thanks @tillrohrmann ! Very good to have this fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3226 Thanks @tillrohrmann ! Very good to have this fixed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3226

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3226
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.2.0: 3400a87dce9940ac8da95c89ff9791ca6a687776
          1.3.0: 0aa9db07800bcc3979dc89bba0b3697149b18ecd

          Show
          till.rohrmann Till Rohrmann added a comment - 1.2.0: 3400a87dce9940ac8da95c89ff9791ca6a687776 1.3.0: 0aa9db07800bcc3979dc89bba0b3697149b18ecd
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann closed the pull request at:

          https://github.com/apache/flink/pull/3227

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/3227

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development