Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5839 Flink Security problem collection
  3. FLINK-5903

taskmanager.numberOfTaskSlots and yarn.containers.vcores did not work well in YARN mode

    Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: YARN
    • Labels:
      None

      Description

      Now Flink did not respect taskmanager.numberOfTaskSlots and yarn.containers.vcores in flink-conf.yaml, but only -s parameter in CLI.

      Details is that taskmanager.numberOfTaskSlots is not working in anyway andyarn.containers.vcores is only used in requesting container(TM) resources but not aware to TM, which means TM will always think it has default(1) Slots if -s is not configured.

        Issue Links

          Activity

          Hide
          WangTao Tao Wang added a comment -

          I've located the reason and fix it ASAP.

          Show
          WangTao Tao Wang added a comment - I've located the reason and fix it ASAP.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user WangTaoTheTonic opened a pull request:

          https://github.com/apache/flink/pull/3408

          FLINK-5903[YARN]respect taskmanager.numberOfTaskSlots and yarn.containers.vcores in YARN mode

          Make sure taskmanager.numberOfTaskSlots and yarn.containers.vcores works in YARN mode. The priorities is: -s/-ys > yarn.containers.vcores > taskmanager.numberOfTaskSlots.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/WangTaoTheTonic/flink FLINK-5903

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3408.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3408


          commit 3515f9faf4b40bff7310a55b7094b52999525cbb
          Author: WangTaoTheTonic <wangtao111@huawei.com>
          Date: 2017-02-24T08:11:47Z

          xxx


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user WangTaoTheTonic opened a pull request: https://github.com/apache/flink/pull/3408 FLINK-5903 [YARN] respect taskmanager.numberOfTaskSlots and yarn.containers.vcores in YARN mode Make sure taskmanager.numberOfTaskSlots and yarn.containers.vcores works in YARN mode. The priorities is: -s/-ys > yarn.containers.vcores > taskmanager.numberOfTaskSlots. You can merge this pull request into a Git repository by running: $ git pull https://github.com/WangTaoTheTonic/flink FLINK-5903 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3408.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3408 commit 3515f9faf4b40bff7310a55b7094b52999525cbb Author: WangTaoTheTonic <wangtao111@huawei.com> Date: 2017-02-24T08:11:47Z xxx
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104124552

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(SLOTS.getOpt()))

          { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + }

          else if (config.containsKey(ConfigConstants.YARN_VCORES)) {
          — End diff –

          I think the preference order should be `TASK_MANAGER_NUM_TASK_SLOTS` > `YARN_VCORES` for the slots.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104124552 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + } else if (config.containsKey(ConfigConstants.YARN_VCORES)) { — End diff – I think the preference order should be `TASK_MANAGER_NUM_TASK_SLOTS` > `YARN_VCORES` for the slots.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104125523

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -537,7 +543,6 @@ public YarnClusterClient createCluster(
          Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");

          AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);

          • yarnClusterDescriptor.setFlinkConfiguration(config);
              • End diff –

          You should give this configuration to the `createDescriptor` method where it is used to instantiate the `YarnClusterDescriptor`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104125523 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -537,7 +543,6 @@ public YarnClusterClient createCluster( Preconditions.checkNotNull(userJarFiles, "User jar files should not be null."); AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); yarnClusterDescriptor.setFlinkConfiguration(config); End diff – You should give this configuration to the `createDescriptor` method where it is used to instantiate the `YarnClusterDescriptor`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104125371

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -537,7 +543,6 @@ public YarnClusterClient createCluster(
          Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");

          AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);

          • yarnClusterDescriptor.setFlinkConfiguration(config);
              • End diff –

          You cannot simply ignore the `configuration` which has been given to `createCluster`. What if it contains configuration values which were added programmatically?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104125371 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -537,7 +543,6 @@ public YarnClusterClient createCluster( Preconditions.checkNotNull(userJarFiles, "User jar files should not be null."); AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); yarnClusterDescriptor.setFlinkConfiguration(config); End diff – You cannot simply ignore the `configuration` which has been given to `createCluster`. What if it contains configuration values which were added programmatically?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104128524

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(SLOTS.getOpt()))

          { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + }

          else if (config.containsKey(ConfigConstants.YARN_VCORES)) {
          — End diff –

          Actually, I'm not so sure whether whether we should use the `YARN_VCORES` value as a fallback value for the number of task manager slots. We had this in the past but we changed it so that the default behaviour is if the number of slots is not specified, then it is `1`. Changing this here for Yarn would make the behaviour inconsistent wrt Mesos, for example. From a user's perspective the old behaviour is imo more predictable. However, we should definitely respect the configuration file settings for the task manager slots.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104128524 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + } else if (config.containsKey(ConfigConstants.YARN_VCORES)) { — End diff – Actually, I'm not so sure whether whether we should use the `YARN_VCORES` value as a fallback value for the number of task manager slots. We had this in the past but we changed it so that the default behaviour is if the number of slots is not specified, then it is `1`. Changing this here for Yarn would make the behaviour inconsistent wrt Mesos, for example. From a user's perspective the old behaviour is imo more predictable. However, we should definitely respect the configuration file settings for the task manager slots.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3408

          Another remark concerning the commit message. In the Flink community it's common to format the commit message the following way:
          ```
          [FLINK-XXXX] [component tag] Commit message title

          Commit message body
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3408 Another remark concerning the commit message. In the Flink community it's common to format the commit message the following way: ``` [FLINK-XXXX] [component tag] Commit message title Commit message body ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shijinkui commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104277608

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(SLOTS.getOpt()))

          { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + }

          else if (config.containsKey(ConfigConstants.YARN_VCORES)) {
          — End diff –

          @tillrohrmann I'm agree with you. We need make sense that YARN_VCORES is available in yarn mode, and yarn/mesos/standalone should have different configuration. And `YARN_VCORES` is the sum of `Resource#getVirtualCores` response from yarn client.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104277608 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + } else if (config.containsKey(ConfigConstants.YARN_VCORES)) { — End diff – @tillrohrmann I'm agree with you. We need make sense that YARN_VCORES is available in yarn mode, and yarn/mesos/standalone should have different configuration. And `YARN_VCORES` is the sum of `Resource#getVirtualCores` response from yarn client.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104836285

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(SLOTS.getOpt()))

          { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + }

          else if (config.containsKey(ConfigConstants.YARN_VCORES)) {
          — End diff –

          @tillrohrmann You mean YARN_VCORES is deprecated now? After checking code I found there're two places where we still use it: [here](https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L317) and [here](https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java#L339), especially the latter one is used for container request.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104836285 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + } else if (config.containsKey(ConfigConstants.YARN_VCORES)) { — End diff – @tillrohrmann You mean YARN_VCORES is deprecated now? After checking code I found there're two places where we still use it: [here] ( https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L317 ) and [here] ( https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java#L339 ), especially the latter one is used for container request.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104837998

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -537,7 +543,6 @@ public YarnClusterClient createCluster(
          Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");

          AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);

          • yarnClusterDescriptor.setFlinkConfiguration(config);
              • End diff –

          @tillrohrmann Under what condition this configuration will contains values added programmatically? I've checked the codes and only found this config is initiallized from config file.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104837998 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -537,7 +543,6 @@ public YarnClusterClient createCluster( Preconditions.checkNotNull(userJarFiles, "User jar files should not be null."); AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); yarnClusterDescriptor.setFlinkConfiguration(config); End diff – @tillrohrmann Under what condition this configuration will contains values added programmatically? I've checked the codes and only found this config is initiallized from config file.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3408

          I'm sorry about the commit message
          next time I'll format it, as it's better not to squash.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3408 I'm sorry about the commit message next time I'll format it, as it's better not to squash.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104855322

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(SLOTS.getOpt()))

          { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + }

          else if (config.containsKey(ConfigConstants.YARN_VCORES)) {
          — End diff –

          And in document, there still has introduction of YARN_VCORES.

          ```
          yarn.containers.vcores The number of virtual cores (vcores) per YARN container. By default, the
          number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise.
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104855322 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + } else if (config.containsKey(ConfigConstants.YARN_VCORES)) { — End diff – And in document, there still has introduction of YARN_VCORES. ``` yarn.containers.vcores The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104861770

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(SLOTS.getOpt()))

          { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + }

          else if (config.containsKey(ConfigConstants.YARN_VCORES)) {
          — End diff –

          No YARN_VCORES is not deprecated. But I'm not sure whether we should use this value as a default value for the number of task manager slots.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104861770 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + } else if (config.containsKey(ConfigConstants.YARN_VCORES)) { — End diff – No YARN_VCORES is not deprecated. But I'm not sure whether we should use this value as a default value for the number of task manager slots.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104862066

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -537,7 +543,6 @@ public YarnClusterClient createCluster(
          Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");

          AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);

          • yarnClusterDescriptor.setFlinkConfiguration(config);
              • End diff –

          This might be the case right now, but it could also change in the future. The signature of the `createCluster` method contains the configuration parameter and, thus, every user will think he can pass a configuration for the creation. We should keep that way.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104862066 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -537,7 +543,6 @@ public YarnClusterClient createCluster( Preconditions.checkNotNull(userJarFiles, "User jar files should not be null."); AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); yarnClusterDescriptor.setFlinkConfiguration(config); End diff – This might be the case right now, but it could also change in the future. The signature of the `createCluster` method contains the configuration parameter and, thus, every user will think he can pass a configuration for the creation. We should keep that way.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104862308

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN
          if (cmd.hasOption(SLOTS.getOpt()))

          { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + }

          else if (config.containsKey(ConfigConstants.YARN_VCORES)) {
          — End diff –

          the document says YARN_VCORE > slotsOfTaskManager > default value(1) shen they are set in file.
          the parameters in command line will be used before those in config file, in some way it is a common sense

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104862308 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -317,6 +319,10 @@ public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationN if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); yarnClusterDescriptor.setTaskManagerSlots(slots); + } else if (config.containsKey(ConfigConstants.YARN_VCORES)) { — End diff – the document says YARN_VCORE > slotsOfTaskManager > default value(1) shen they are set in file. the parameters in command line will be used before those in config file, in some way it is a common sense
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r104867661

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java —
          @@ -537,7 +543,6 @@ public YarnClusterClient createCluster(
          Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");

          AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);

          • yarnClusterDescriptor.setFlinkConfiguration(config);
              • End diff –

          all right

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r104867661 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java — @@ -537,7 +543,6 @@ public YarnClusterClient createCluster( Preconditions.checkNotNull(userJarFiles, "User jar files should not be null."); AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine); yarnClusterDescriptor.setFlinkConfiguration(config); End diff – all right
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3408

          I move the initiallization of this config to constructor of cluster descripter and restore the deleted configuration setting.
          Please check if we are good with the usage of `YARN_VCORES`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3408 I move the initiallization of this config to constructor of cluster descripter and restore the deleted configuration setting. Please check if we are good with the usage of `YARN_VCORES`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3408

          ping @tillrohrmann

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3408 ping @tillrohrmann
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3408

          @tillrohrmann @StephanEwen

          The code was changed and I've verified the functions, could you please review this and merge it if it's good to go?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3408 @tillrohrmann @StephanEwen The code was changed and I've verified the functions, could you please review this and merge it if it's good to go?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r108417410

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java —
          @@ -335,8 +335,7 @@ protected void requestNewWorkers(int numWorkers) {
          Priority priority = Priority.newInstance(0);

          // Resource requirements for worker containers

          • int taskManagerSlots = taskManagerParameters.numSlots();
          • int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
            + int vcores = Math.max(taskManagerParameters.numSlots(), 1);
              • End diff –

          I think we should still keep the possibility open to configure the `vcores` explicitly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r108417410 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java — @@ -335,8 +335,7 @@ protected void requestNewWorkers(int numWorkers) { Priority priority = Priority.newInstance(0); // Resource requirements for worker containers int taskManagerSlots = taskManagerParameters.numSlots(); int vcores = config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1)); + int vcores = Math.max(taskManagerParameters.numSlots(), 1); End diff – I think we should still keep the possibility open to configure the `vcores` explicitly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3408#discussion_r108417249

          — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java —
          @@ -164,6 +164,12 @@ public AbstractYarnClusterDescriptor()

          { throw new RuntimeException("Unable to locate configuration file in " + confFile); }

          flinkConfigurationPath = new Path(confFile.getAbsolutePath());
          +
          + if (flinkConfiguration.containsKey(ConfigConstants.YARN_VCORES))

          { + slots = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, -1); + }

          else if (flinkConfiguration.containsKey(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS))

          { + slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1); + }

          — End diff –

          I think we should make the behaviour consistent with Mesos and standalone where we set `slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3408#discussion_r108417249 — Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java — @@ -164,6 +164,12 @@ public AbstractYarnClusterDescriptor() { throw new RuntimeException("Unable to locate configuration file in " + confFile); } flinkConfigurationPath = new Path(confFile.getAbsolutePath()); + + if (flinkConfiguration.containsKey(ConfigConstants.YARN_VCORES)) { + slots = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, -1); + } else if (flinkConfiguration.containsKey(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)) { + slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1); + } — End diff – I think we should make the behaviour consistent with Mesos and standalone where we set `slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1)`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3408

          @tillrohrmann I still cannot get your point entirely. don't this three configs(`-s/-ys`, `yarn.containers.vcores` and `taskmanager.numberOfTaskSlots` mean same thing? Do they have difference in usage except priority?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3408 @tillrohrmann I still cannot get your point entirely. don't this three configs(`-s/-ys`, `yarn.containers.vcores` and `taskmanager.numberOfTaskSlots` mean same thing? Do they have difference in usage except priority?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3408

          If I'm not mistaken, then `yarn.containers.vcores` defines the number of vcores for a container. This, however, is not the same as the number of slots, since a slot can also have more than one vcore assigned.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3408 If I'm not mistaken, then `yarn.containers.vcores` defines the number of vcores for a container. This, however, is not the same as the number of slots, since a slot can also have more than one vcore assigned.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3408

          All right. That makes sense. Let me rephrase that and please check if we are in same channel:

          1. slots of taskmanager is decided by `-s/-ys` and `taskmanager.numberOfTaskSlots`, the former has higher priority; and
          2. the vcores of yarn container is decided by `yarn.containers.vcores`, which will use values of `-s/-ys` or `taskmanager.numberOfTaskSlots` if user doesn't set `yarn.containers.vcores` explicitly.

          Is that right?

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3408 All right. That makes sense. Let me rephrase that and please check if we are in same channel: 1. slots of taskmanager is decided by `-s/-ys` and `taskmanager.numberOfTaskSlots`, the former has higher priority; and 2. the vcores of yarn container is decided by `yarn.containers.vcores`, which will use values of `-s/-ys` or `taskmanager.numberOfTaskSlots` if user doesn't set `yarn.containers.vcores` explicitly. Is that right?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3408

          Yes exactly @WangTaoTheTonic

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3408 Yes exactly @WangTaoTheTonic
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3408

          @tillrohrmann
          After changing code the test results(both session and single mode) is like:

          Configurations #vcores of container(TM) #slots of TM
          ---------------------------------------- ------------------------ ------------
          -s/-ys 5, yarn.containers.vcores: 4, taskmanager.numberOfTaskSlots: 3 4 5
          yarn.containers.vcores: 4, taskmanager.numberOfTaskSlots: 3 4 3
          yarn.containers.vcores: 4 4 1
          -s/-ys 5, taskmanager.numberOfTaskSlots: 3 5 5
          taskmanager.numberOfTaskSlots: 3 3 3
          Nothing to specify, all use default 1 1

          Please check

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3408 @tillrohrmann After changing code the test results(both session and single mode) is like: Configurations #vcores of container(TM) #slots of TM ---------------------------------------- ------------------------ ------------ -s/-ys 5, yarn.containers.vcores: 4, taskmanager.numberOfTaskSlots: 3 4 5 yarn.containers.vcores: 4, taskmanager.numberOfTaskSlots: 3 4 3 yarn.containers.vcores: 4 4 1 -s/-ys 5, taskmanager.numberOfTaskSlots: 3 5 5 taskmanager.numberOfTaskSlots: 3 3 3 Nothing to specify, all use default 1 1 Please check
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3408

          Changes look good to me. Thanks for your contribution @WangTaoTheTonic. Merging this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3408 Changes look good to me. Thanks for your contribution @WangTaoTheTonic. Merging this PR.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          Fixed via 2313a74e2726167f9cb586a298d4c397f3b82b69

          Show
          till.rohrmann Till Rohrmann added a comment - Fixed via 2313a74e2726167f9cb586a298d4c397f3b82b69
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3408

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3408

            People

            • Assignee:
              WangTao Tao Wang
              Reporter:
              WangTao Tao Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development