Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5904

jobmanager.heap.mb and taskmanager.heap.mb not work in YARN mode

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: YARN
    • Labels:
      None

      Description

      I set taskmanager.heap.mb to 5120 and jobmanager.heap.mb to 2048, and run ./yarn-session.sh -n 3, but the YARN only allocates 4GB for this application.

        Issue Links

          Activity

          Hide
          WangTao Tao Wang added a comment -

          I'll locate the reason and fix it ASAP.

          Show
          WangTao Tao Wang added a comment - I'll locate the reason and fix it ASAP.
          Hide
          StephanEwen Stephan Ewen added a comment -

          This behaves differently in Yarn: The heap sizes are computed via container size plus cutoff.

          If you want to change this, please check with the rest of the community if a change in behavior there is desirable.

          Show
          StephanEwen Stephan Ewen added a comment - This behaves differently in Yarn: The heap sizes are computed via container size plus cutoff. If you want to change this, please check with the rest of the community if a change in behavior there is desirable.
          Hide
          WangTao Tao Wang added a comment -

          Stephan Ewen I'm making these two configuration items same with "-yjm""-ytm" in yarn session and "-jm""-tm" in single job. Looks like it might get some misunderstanding.

          We don't have a proper config item here. What's better idea do you suggest?

          Show
          WangTao Tao Wang added a comment - Stephan Ewen I'm making these two configuration items same with "-yjm""-ytm" in yarn session and "-jm""-tm" in single job. Looks like it might get some misunderstanding. We don't have a proper config item here. What's better idea do you suggest?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user WangTaoTheTonic opened a pull request:

          https://github.com/apache/flink/pull/3414

          FLINK-5904[YARN]make jobmanager.heap.mb and taskmanager.heap.mb work in YARN mode

          I'm making these two configuration items same with "-yjm""-ytm" in yarn session and "-jm""-tm" in single job. Looks like it might get some misunderstanding.

          We don't have a proper config item here. What's better idea do you suggest?

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/WangTaoTheTonic/flink FLINK-5904

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3414.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3414


          commit b21d27ea99ffa613a8eb9c00a4f977b619b12418
          Author: WangTaoTheTonic <wangtao111@huawei.com>
          Date: 2017-02-25T04:19:43Z

          make jobmanager.heap.mb and taskmanager.heap.mb work in YARN mode


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user WangTaoTheTonic opened a pull request: https://github.com/apache/flink/pull/3414 FLINK-5904 [YARN] make jobmanager.heap.mb and taskmanager.heap.mb work in YARN mode I'm making these two configuration items same with "-yjm""-ytm" in yarn session and "-jm""-tm" in single job. Looks like it might get some misunderstanding. We don't have a proper config item here. What's better idea do you suggest? You can merge this pull request into a Git repository by running: $ git pull https://github.com/WangTaoTheTonic/flink FLINK-5904 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3414.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3414 commit b21d27ea99ffa613a8eb9c00a4f977b619b12418 Author: WangTaoTheTonic <wangtao111@huawei.com> Date: 2017-02-25T04:19:43Z make jobmanager.heap.mb and taskmanager.heap.mb work in YARN mode
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3414#discussion_r104174086

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -306,12 +308,16 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(JM_MEMORY.getOpt()))

          { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); + }

          else if (config.containsKey(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY)) {
          + yarnClusterDescriptor.setJobManagerMemory(config.getInteger(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_HEAP_MEMORY));
          — End diff –

          I'm wondering whether the `yarnClusterDescriptor` should not have been automatically instantiated with this value if it gets the configuration as a constructor parameter.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3414#discussion_r104174086 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -306,12 +308,16 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(JM_MEMORY.getOpt())) { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); + } else if (config.containsKey(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY)) { + yarnClusterDescriptor.setJobManagerMemory(config.getInteger(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_HEAP_MEMORY)); — End diff – I'm wondering whether the `yarnClusterDescriptor` should not have been automatically instantiated with this value if it gets the configuration as a constructor parameter.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3414#discussion_r104174440

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java —
          @@ -110,7 +110,12 @@
          public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";

          // -------------------------------- Runtime -------------------------------

          • +
            + /**
            + * JVM heap size (in megabytes) for the JobManager
            + */
            + public static final String JOB_MANAGER_HEAP_MEMORY_KEY = "jobmanager.heap.mb";

              • End diff –

          Can we introduce these values as a `ConfigOption` instead of an entry to `ConfigConstants`? I think it might go to `TaskManagerOptions` and to `JobManagerOptions`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3414#discussion_r104174440 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java — @@ -110,7 +110,12 @@ public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay"; // -------------------------------- Runtime ------------------------------- + + /** + * JVM heap size (in megabytes) for the JobManager + */ + public static final String JOB_MANAGER_HEAP_MEMORY_KEY = "jobmanager.heap.mb"; End diff – Can we introduce these values as a `ConfigOption` instead of an entry to `ConfigConstants`? I think it might go to `TaskManagerOptions` and to `JobManagerOptions`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3414#discussion_r104853211

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -306,12 +308,16 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(JM_MEMORY.getOpt()))

          { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); + }

          else if (config.containsKey(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY)) {
          + yarnClusterDescriptor.setJobManagerMemory(config.getInteger(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_HEAP_MEMORY));
          — End diff –

          do you mean we should only create `jobManagerMemoryMb` object but not init it with value?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/3414#discussion_r104853211 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -306,12 +308,16 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(JM_MEMORY.getOpt())) { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); + } else if (config.containsKey(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY)) { + yarnClusterDescriptor.setJobManagerMemory(config.getInteger(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_HEAP_MEMORY)); — End diff – do you mean we should only create `jobManagerMemoryMb` object but not init it with value?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3414#discussion_r104854352

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java —
          @@ -110,7 +110,12 @@
          public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay";

          // -------------------------------- Runtime -------------------------------

          • +
            + /**
            + * JVM heap size (in megabytes) for the JobManager
            + */
            + public static final String JOB_MANAGER_HEAP_MEMORY_KEY = "jobmanager.heap.mb";

              • End diff –

          Nice. I've changed

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/3414#discussion_r104854352 — Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java — @@ -110,7 +110,12 @@ public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay"; // -------------------------------- Runtime ------------------------------- + + /** + * JVM heap size (in megabytes) for the JobManager + */ + public static final String JOB_MANAGER_HEAP_MEMORY_KEY = "jobmanager.heap.mb"; End diff – Nice. I've changed
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3414#discussion_r104862218

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -306,12 +308,16 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(JM_MEMORY.getOpt()))

          { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); + }

          else if (config.containsKey(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY)) {
          + yarnClusterDescriptor.setJobManagerMemory(config.getInteger(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_HEAP_MEMORY));
          — End diff –

          no, but initialize the value in the constructor of the `yarnClusterDescriptor`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3414#discussion_r104862218 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -306,12 +308,16 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(JM_MEMORY.getOpt())) { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); + } else if (config.containsKey(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY)) { + yarnClusterDescriptor.setJobManagerMemory(config.getInteger(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_HEAP_MEMORY)); — End diff – no, but initialize the value in the constructor of the `yarnClusterDescriptor`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3414#discussion_r104862993

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -306,12 +308,16 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(JM_MEMORY.getOpt()))

          { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); + }

          else if (config.containsKey(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY)) {
          + yarnClusterDescriptor.setJobManagerMemory(config.getInteger(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_HEAP_MEMORY));
          — End diff –

          That's a good idea. we don't need to set it explicitly here if we init it in constructor

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/3414#discussion_r104862993 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -306,12 +308,16 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(JM_MEMORY.getOpt())) { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); yarnClusterDescriptor.setJobManagerMemory(jmMemory); + } else if (config.containsKey(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY)) { + yarnClusterDescriptor.setJobManagerMemory(config.getInteger(ConfigConstants.JOB_MANAGER_HEAP_MEMORY_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_HEAP_MEMORY)); — End diff – That's a good idea. we don't need to set it explicitly here if we init it in constructor
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3414

          @tillrohrmann I've changed per comments. Mind reviewing again? Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3414 @tillrohrmann I've changed per comments. Mind reviewing again? Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3414

          Thanks for your contribution @WangTaoTheTonic. LGTM. Merging the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3414 Thanks for your contribution @WangTaoTheTonic. LGTM. Merging the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3414

          Thanks. I've resolved conflicts. enjoy

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3414 Thanks. I've resolved conflicts. enjoy
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3414

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3414
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Fixed via 0da62e8d77dceca6f39e70fb6a313a8169364de0

          Show
          till.rohrmann Till Rohrmann added a comment - Fixed via 0da62e8d77dceca6f39e70fb6a313a8169364de0

            People

            • Assignee:
              WangTao Tao Wang
              Reporter:
              WangTao Tao Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development