Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6833

Race condition: Asynchronous checkpointing task can fail completed StreamTask

    Details

      Description

      A StreamTask which is about to finish and thus transitioning its containing Task into the ExecutionState.FINISHED state, can be failed by a concurrent asynchronous checkpointing operation. The problem is that upon termination the StreamTask cancels all concurrent operations (amongst others ongoing asynchronous checkpoints). The cancellation of the async checkpoint triggers the StreamTask#handleAsyncException call which will fail the containing Task. If the handleAsyncException completes before the StreamTask has been properly terminated, then the containing Task will transition into ExecutionState.FAILED instead of ExecutionState.FINISHED.

      In order to resolve this race condition, we should check in the StreamTask#handleAsyncException whether the StreamTask is still running or has already been terminated. Only in the former case, we should fail the containing Task.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/4058

          FLINK-6833 [task] Fail StreamTask only due to async exception if it is running

          In order to resolve a race condition between a properly terminated StreamTask which
          cleans up its resources (stopping asynchronous operations, etc.) and a cancelled
          asynchronous operation (e.g. asynchronous checkpointing operation), we check whether
          the StreamTask is still running before failing it externally.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink fixStreamTaskRaceCondition

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4058.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4058


          commit 3d119a115e707e80b4b14174edd7de5048b732e9
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-06-02T13:48:54Z

          FLINK-6833 [task] Fail StreamTask only due to async exception if it is running

          In order to resolve a race condition between a properly terminated StreamTask which
          cleans up its resources (stopping asynchronous operations, etc.) and a cancelled
          asynchronous operation (e.g. asynchronous checkpointing operation), we check whether
          the StreamTask is still running before failing it externally.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4058 FLINK-6833 [task] Fail StreamTask only due to async exception if it is running In order to resolve a race condition between a properly terminated StreamTask which cleans up its resources (stopping asynchronous operations, etc.) and a cancelled asynchronous operation (e.g. asynchronous checkpointing operation), we check whether the StreamTask is still running before failing it externally. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixStreamTaskRaceCondition Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4058.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4058 commit 3d119a115e707e80b4b14174edd7de5048b732e9 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-06-02T13:48:54Z FLINK-6833 [task] Fail StreamTask only due to async exception if it is running In order to resolve a race condition between a properly terminated StreamTask which cleans up its resources (stopping asynchronous operations, etc.) and a cancelled asynchronous operation (e.g. asynchronous checkpointing operation), we check whether the StreamTask is still running before failing it externally.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/4058

          The test will not pass checkstyle. You can run `mvn checkstyle:check` on the command-line to quickly verify checkstyle compliance.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/4058 The test will not pass checkstyle. You can run `mvn checkstyle:check` on the command-line to quickly verify checkstyle compliance.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4058#discussion_r120291185

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java —
          @@ -824,11 +824,14 @@ public ProcessingTimeService getProcessingTimeService() {

          • FAILED, and, if the invokable code is running, starts an asynchronous thread
          • that aborts that code.
            *
          • * <p>This method never blocks.</p>
            + * <p>This method never blocks.
            */
            @Override
            public void handleAsyncException(String message, Throwable exception) {
          • getEnvironment().failExternally(exception);
            + if (isRunning) {
              • End diff –

          I doubt that this is working fix, it will only make the problem less likely to occur. This method can be called asynchronously to Threads that manipulate `isRunning`, which means that the stream task can leave running status after the condition was checked as true, but before `failExternally(...)` went through.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4058#discussion_r120291185 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — @@ -824,11 +824,14 @@ public ProcessingTimeService getProcessingTimeService() { FAILED, and, if the invokable code is running, starts an asynchronous thread that aborts that code. * * <p>This method never blocks.</p> + * <p>This method never blocks. */ @Override public void handleAsyncException(String message, Throwable exception) { getEnvironment().failExternally(exception); + if (isRunning) { End diff – I doubt that this is working fix, it will only make the problem less likely to occur. This method can be called asynchronously to Threads that manipulate `isRunning`, which means that the stream task can leave running status after the condition was checked as true, but before `failExternally(...)` went through.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4058#discussion_r120757786

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java —
          @@ -824,11 +824,14 @@ public ProcessingTimeService getProcessingTimeService() {

          • FAILED, and, if the invokable code is running, starts an asynchronous thread
          • that aborts that code.
            *
          • * <p>This method never blocks.</p>
            + * <p>This method never blocks.
            */
            @Override
            public void handleAsyncException(String message, Throwable exception) {
          • getEnvironment().failExternally(exception);
            + if (isRunning) {
              • End diff –

          If `isRunning == true` when entering the if branch, then depending on what happens before `failExternally`, we can assume that the `handleAsyncException` either happened atomically before `isRunning` was set to `false` or not. But what we don't want to happen is that if `isRunning == false`, that we can still fail the task. Thus, I think it solves a valid problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4058#discussion_r120757786 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — @@ -824,11 +824,14 @@ public ProcessingTimeService getProcessingTimeService() { FAILED, and, if the invokable code is running, starts an asynchronous thread that aborts that code. * * <p>This method never blocks.</p> + * <p>This method never blocks. */ @Override public void handleAsyncException(String message, Throwable exception) { getEnvironment().failExternally(exception); + if (isRunning) { End diff – If `isRunning == true` when entering the if branch, then depending on what happens before `failExternally`, we can assume that the `handleAsyncException` either happened atomically before `isRunning` was set to `false` or not. But what we don't want to happen is that if `isRunning == false`, that we can still fail the task. Thus, I think it solves a valid problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/4058

          I would like to merge this PR if I could clarify all concerns and there are no further objections.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4058 I would like to merge this PR if I could clarify all concerns and there are no further objections.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/4058

          LGTM +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4058 LGTM +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/4058

          Thanks for the review @zentol and @StefanRRichter. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4058 Thanks for the review @zentol and @StefanRRichter. Merging this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/4058

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4058
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.4.0: ab2fc023047662b650af6e6625196bd72cbb30a0
          1.3.1: a3103c2ddd6b8a7566b8fd27f3acd695b4b345c7

          Show
          till.rohrmann Till Rohrmann added a comment - 1.4.0: ab2fc023047662b650af6e6625196bd72cbb30a0 1.3.1: a3103c2ddd6b8a7566b8fd27f3acd695b4b345c7

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              till.rohrmann Till Rohrmann
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development