Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9776

Interrupt TaskThread only while in User/Operator code

Details

    Description

      Upon cancellation, the task thread is periodically interrupted.
      This helps to pull the thread out of blocking operations in the user code.

      Once the thread leaves the user code, the repeated interrupts may interfere with the shutdown cleanup logic, causing confusing exceptions.

      We should stop sending the periodic interrupts once the thread leaves the user code.

      Attachments

        Issue Links

          Activity

            githubbot ASF GitHub Bot added a comment -

            GitHub user StephanEwen opened a pull request:

            https://github.com/apache/flink/pull/6275

            FLINK-9776 [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code.

              1. What is the purpose of the change

            Upon cancellation, the task thread is periodically interrupted. This helps to pull the thread out of blocking operations in the user code.

            However, once the thread leaves the user code, the repeated interrupts may interfere with the shutdown cleanup logic, causing confusing exceptions.

            This PR changes the behavior to stop sending the periodic interrupts once the thread leaves the user code.

              1. Brief change log
            • `AbstractInvokable` maintains a flag whether interrupts should be sent.
            • `StreamTask` sets to not receive interrupts after coming out of the user code
              1. Verifying this change

            This change is a trivial rework that currently only avoids throwing and catching of InterruptedExceptions that may cause noise in the logs.

              1. Does this pull request potentially affect one of the following parts:
            • Dependencies (does it add or upgrade a dependency): *no*
            • The public API, i.e., is any changed class annotated with `@Public(Evolving)`: *no*
            • The serializers: *no*
            • The runtime per-record code paths (performance sensitive): *no*
            • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: *no*
            • The S3 file system connector: *no*
              1. Documentation
            • Does this pull request introduce a new feature? *no*
            • If yes, how is the feature documented? *not applicable*

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/StephanEwen/incubator-flink stop_interrupts

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/flink/pull/6275.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #6275


            commit 73d31551574f3c18e4cbc079681ed93f9ec2ef34
            Author: Stephan Ewen <sewen@...>
            Date: 2018-07-06T11:34:27Z

            FLINK-9776 [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code.


            githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/6275 FLINK-9776 [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code. What is the purpose of the change Upon cancellation, the task thread is periodically interrupted. This helps to pull the thread out of blocking operations in the user code. However, once the thread leaves the user code, the repeated interrupts may interfere with the shutdown cleanup logic, causing confusing exceptions. This PR changes the behavior to stop sending the periodic interrupts once the thread leaves the user code. Brief change log `AbstractInvokable` maintains a flag whether interrupts should be sent. `StreamTask` sets to not receive interrupts after coming out of the user code Verifying this change This change is a trivial rework that currently only avoids throwing and catching of InterruptedExceptions that may cause noise in the logs. Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): * no * The public API, i.e., is any changed class annotated with `@Public(Evolving)`: * no * The serializers: * no * The runtime per-record code paths (performance sensitive): * no * Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: * no * The S3 file system connector: * no * Documentation Does this pull request introduce a new feature? * no * If yes, how is the feature documented? * not applicable * You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink stop_interrupts Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6275.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6275 commit 73d31551574f3c18e4cbc079681ed93f9ec2ef34 Author: Stephan Ewen <sewen@...> Date: 2018-07-06T11:34:27Z FLINK-9776 [runtime] Stop sending periodic interrupts once executing thread leaves user function / operator code.
            githubbot ASF GitHub Bot added a comment -

            Github user StefanRRichter commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6275#discussion_r200737014

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
            @@ -1563,7 +1573,7 @@ public void run() {

            // log stack trace where the executing thread is stuck and
            // interrupt the running thread periodically while it is still alive

            • while (executerThread.isAlive()) {
              + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {
                • End diff –

            I think that an atomic boolean might be required. This check can pass, then we get interrupted, meanwhile the stream task might already go into the shutdown code and the interrupt might slip through?

            githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6275#discussion_r200737014 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1563,7 +1573,7 @@ public void run() { // log stack trace where the executing thread is stuck and // interrupt the running thread periodically while it is still alive while (executerThread.isAlive()) { + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) { End diff – I think that an atomic boolean might be required. This check can pass, then we get interrupted, meanwhile the stream task might already go into the shutdown code and the interrupt might slip through?
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6275#discussion_r200814568

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
            @@ -1563,7 +1573,7 @@ public void run() {

            // log stack trace where the executing thread is stuck and
            // interrupt the running thread periodically while it is still alive

            • while (executerThread.isAlive()) {
              + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {
                • End diff –

            True, this is no 100% guarantee that interrupts do not come. That would need an atomic "interrupt if flag is set call", but I don't know if that is possible in Java without introducing a locked code block, which I wanted to avoid.

            It may also not be necessary. I think the variant here is already strictly better than the current state, which is correct already. The current state mainly suffers from shutdowns "looking rough" due to interruptions.

            This change should the majority of that, because in the vast majority of shutdowns, the thread exits before the first of the "repeated interrupts". The thread only experiences the initial interrupt.

            In some sense, only clearing the initial interrupt flag would probably help > 90% of the cases already. This solves a few more % of the cases by guarding the repeated interrupts.

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6275#discussion_r200814568 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1563,7 +1573,7 @@ public void run() { // log stack trace where the executing thread is stuck and // interrupt the running thread periodically while it is still alive while (executerThread.isAlive()) { + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) { End diff – True, this is no 100% guarantee that interrupts do not come. That would need an atomic "interrupt if flag is set call", but I don't know if that is possible in Java without introducing a locked code block, which I wanted to avoid. It may also not be necessary. I think the variant here is already strictly better than the current state, which is correct already. The current state mainly suffers from shutdowns "looking rough" due to interruptions. This change should the majority of that, because in the vast majority of shutdowns, the thread exits before the first of the "repeated interrupts". The thread only experiences the initial interrupt. In some sense, only clearing the initial interrupt flag would probably help > 90% of the cases already. This solves a few more % of the cases by guarding the repeated interrupts.
            githubbot ASF GitHub Bot added a comment -

            Github user StefanRRichter commented on a diff in the pull request:

            https://github.com/apache/flink/pull/6275#discussion_r200911028

            — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java —
            @@ -1563,7 +1573,7 @@ public void run() {

            // log stack trace where the executing thread is stuck and
            // interrupt the running thread periodically while it is still alive

            • while (executerThread.isAlive()) {
              + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) {
                • End diff –

            Ok, if the intention is improvement and not 100% certainty, then this is perfectly ok.

            githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6275#discussion_r200911028 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java — @@ -1563,7 +1573,7 @@ public void run() { // log stack trace where the executing thread is stuck and // interrupt the running thread periodically while it is still alive while (executerThread.isAlive()) { + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) { End diff – Ok, if the intention is improvement and not 100% certainty, then this is perfectly ok.
            githubbot ASF GitHub Bot added a comment -

            Github user StefanRRichter commented on the issue:

            https://github.com/apache/flink/pull/6275

            LGTM đź‘Ť

            githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6275 LGTM đź‘Ť
            githubbot ASF GitHub Bot added a comment -

            Github user StephanEwen commented on the issue:

            https://github.com/apache/flink/pull/6275

            All right, thanks, merging!

            githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/6275 All right, thanks, merging!
            githubbot ASF GitHub Bot added a comment -

            Github user asfgit closed the pull request at:

            https://github.com/apache/flink/pull/6275

            githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6275
            sunjincheng121 sunjincheng added a comment -

            The PR has been merged.

            sunjincheng121 sunjincheng added a comment - The PR has been merged.
            rmetzger Robert Metzger added a comment -

            This change has actually been merged to Flink 1.6.0 (as you can see from the commit) https://github.com/apache/flink/commit/53e6657658bc750b78c32e91fa7e2c02e8c54e33

            The fixversion has probably been pushed forward because the ticket was still open.

            I'm adjusting the fix version from 1.6.4 to 1.6.0

            rmetzger Robert Metzger added a comment - This change has actually been merged to Flink 1.6.0 (as you can see from the commit) https://github.com/apache/flink/commit/53e6657658bc750b78c32e91fa7e2c02e8c54e33 The fixversion has probably been pushed forward because the ticket was still open. I'm adjusting the fix version from 1.6.4 to 1.6.0
            rmetzger Robert Metzger added a comment -

            There is no 1.6.0 version. Assigning it to 1.6.2 to indicate that it has been early in the 1.6.0 releases.

            rmetzger Robert Metzger added a comment - There is no 1.6.0 version. Assigning it to 1.6.2 to indicate that it has been early in the 1.6.0 releases.

            People

              sewen Stephan Ewen
              sewen Stephan Ewen
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: