Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6341

JobManager can go to definite message sending loop when TaskManager registered

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: JobManager
    • Labels:
      None

      Description

      When TaskManager register to JobManager, JM will send a "NotifyResourceStarted" message to kick off Resource Manager, then trigger a reconnection to resource manager through sending a "TriggerRegistrationAtJobManager".

      When the ref of resource manager in JobManager is not None and the reconnection is to same resource manager, JobManager will go to a infinite message sending loop which will always sending himself a "ReconnectResourceManager" every 2 seconds.

      We have already observed that phonomenon. More details, check how JobManager handles `ReconnectResourceManager`.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user WangTaoTheTonic opened a pull request:

          https://github.com/apache/flink/pull/3745

          FLINK-6341Don't let JM fall into infinite loop

          When TaskManager register to JobManager, JM will send a "NotifyResourceStarted" message to kick off Resource Manager, then trigger a reconnection to resource manager through sending a "TriggerRegistrationAtJobManager".
          When the ref of resource manager in JobManager is not None and the reconnection is to same resource manager, JobManager will go to a infinite message sending loop which will always sending himself a "ReconnectResourceManager" every 2 seconds.
          We have already observed that phonomenon. More details, check how JobManager handles `ReconnectResourceManager`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/WangTaoTheTonic/flink FLINK-6341

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3745.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3745


          commit 8eb4dd42a71d9830c91b3db824e3133ce3d35c08
          Author: WangTaoTheTonic <wangtao111@huawei.com>
          Date: 2017-04-20T12:28:10Z

          Don't let JM fall into infinite loop


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user WangTaoTheTonic opened a pull request: https://github.com/apache/flink/pull/3745 FLINK-6341 Don't let JM fall into infinite loop When TaskManager register to JobManager, JM will send a "NotifyResourceStarted" message to kick off Resource Manager, then trigger a reconnection to resource manager through sending a "TriggerRegistrationAtJobManager". When the ref of resource manager in JobManager is not None and the reconnection is to same resource manager, JobManager will go to a infinite message sending loop which will always sending himself a "ReconnectResourceManager" every 2 seconds. We have already observed that phonomenon. More details, check how JobManager handles `ReconnectResourceManager`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/WangTaoTheTonic/flink FLINK-6341 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3745.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3745 commit 8eb4dd42a71d9830c91b3db824e3133ce3d35c08 Author: WangTaoTheTonic <wangtao111@huawei.com> Date: 2017-04-20T12:28:10Z Don't let JM fall into infinite loop
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3745

          Thanks for spotting the problem @WangTaoTheTonic. This is important to fix.

          I think we cannot solve the problem as you've done it, because you access actor state from outside of the actor's main thread. I think it would be better to introduce a resource manager connection id which we can use to distinguish whether we have to reconnect from a resource manager or whether it is an outdated `ReconnectResourceManager` message. We can do the check then in the handler for the `ReconnectResourceManager` message.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3745 Thanks for spotting the problem @WangTaoTheTonic. This is important to fix. I think we cannot solve the problem as you've done it, because you access actor state from outside of the actor's main thread. I think it would be better to introduce a resource manager connection id which we can use to distinguish whether we have to reconnect from a resource manager or whether it is an outdated `ReconnectResourceManager` message. We can do the check then in the handler for the `ReconnectResourceManager` message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3745#discussion_r112456827

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala —
          @@ -356,7 +356,12 @@ class JobManager(
          msg.resourceManager() ! decorateMessage(new TriggerRegistrationAtJobManager(self))
          // try again after some delay
          context.system.scheduler.scheduleOnce(2 seconds) {

          • self ! decorateMessage(msg)
            + currentResourceManager match {
              • End diff –

          This access is problematic, because we're accessing actor state from within another thread.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3745#discussion_r112456827 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala — @@ -356,7 +356,12 @@ class JobManager( msg.resourceManager() ! decorateMessage(new TriggerRegistrationAtJobManager(self)) // try again after some delay context.system.scheduler.scheduleOnce(2 seconds) { self ! decorateMessage(msg) + currentResourceManager match { End diff – This access is problematic, because we're accessing actor state from within another thread.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3745

          @tillrohrmann What problem it will bring if we access `currentResourceManager` from another thread? It is a variable in JobManager and can be shared across multi threads, right? The new added code just read it and there's no cocurrency problem coming, i think.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3745 @tillrohrmann What problem it will bring if we access `currentResourceManager` from another thread? It is a variable in JobManager and can be shared across multi threads, right? The new added code just read it and there's no cocurrency problem coming, i think.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3745

          It is a problem because, the `currentResourceManager` is not volatile. Thus, you might miss some state changes in the future callback because of a cached value. Moreover, accessing mutable actor state from a different thread breaks the actor encapsulation and can lead to subtle race conditions and other synchronization bugs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3745 It is a problem because, the `currentResourceManager` is not volatile. Thus, you might miss some state changes in the future callback because of a cached value. Moreover, accessing mutable actor state from a different thread breaks the actor encapsulation and can lead to subtle race conditions and other synchronization bugs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3745

          I see. Connection ID is added, please check if it's ok

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3745 I see. Connection ID is added, please check if it's ok
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3745

          Thanks for your contribution @WangTaoTheTonic. Changes look good to me. Merging your PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3745 Thanks for your contribution @WangTaoTheTonic. Changes look good to me. Merging your PR.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Fixed via 238383926b762c1d47159a2b4dabe8fd59777307

          Show
          till.rohrmann Till Rohrmann added a comment - Fixed via 238383926b762c1d47159a2b4dabe8fd59777307
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3745

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3745

            People

            • Assignee:
              WangTao Tao Wang
              Reporter:
              WangTao Tao Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development