Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6270

Port several network config parameters to ConfigOption

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Component/s: Network
    • Labels:
      None

      Description

      I'd like to port some memory and network buffers related config options to new ConfigOption instances before continuing with FLINK-4545. These include:

      • taskmanager.memory.size
      • taskmanager.memory.fraction
      • taskmanager.memory.off-heap
      • taskmanager.memory.preallocate
      • taskmanager.network.numberOfBuffers
      • taskmanager.memory.segment-size

      Some of these already existed as ConfigOption instances in MiniClusterConfiguration.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user NicoK opened a pull request:

          https://github.com/apache/flink/pull/3683

          FLINK-6270 Port several network config parameters to ConfigOption

          This ports some memory and network buffers related config options to new `ConfigOption` instances. These include:

          • `taskmanager.memory.size`
          • `taskmanager.memory.fraction`
          • `taskmanager.memory.off-heap`
          • `taskmanager.memory.preallocate`
          • `taskmanager.network.numberOfBuffers`
          • `taskmanager.memory.segment-size`

          Some of these already existed as `ConfigOption` instances in `MiniClusterConfiguration`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/NicoK/flink flink-6270

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3683.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3683


          commit 15ae80ee5a9cd29597f6ed597183251fbbb32f37
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-05T08:59:00Z

          FLINK-6270 extend Configuration with contains(configOption)

          commit f136af323bce2517983af6c47849c9c70dc3efc2
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-05T09:45:57Z

          FLINK-6270 port some memory and network task manager options to ConfigOption


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3683 FLINK-6270 Port several network config parameters to ConfigOption This ports some memory and network buffers related config options to new `ConfigOption` instances. These include: `taskmanager.memory.size` `taskmanager.memory.fraction` `taskmanager.memory.off-heap` `taskmanager.memory.preallocate` `taskmanager.network.numberOfBuffers` `taskmanager.memory.segment-size` Some of these already existed as `ConfigOption` instances in `MiniClusterConfiguration`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-6270 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3683.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3683 commit 15ae80ee5a9cd29597f6ed597183251fbbb32f37 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-05T08:59:00Z FLINK-6270 extend Configuration with contains(configOption) commit f136af323bce2517983af6c47849c9c70dc3efc2 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-05T09:45:57Z FLINK-6270 port some memory and network task manager options to ConfigOption
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3683#discussion_r110102530

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java —
          @@ -39,10 +39,53 @@
          key("taskmanager.jvm-exit-on-oom")
          .defaultValue(false);

          + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */
          + public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
          + key("taskmanager.memory.segment-size")
          + .defaultValue(32768);
          +
          + /**
          + * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
          + * set, a relative fraction will be allocated, as defined by

          {@link #MANAGED_MEMORY_FRACTION}

          .
          + */
          + public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
          + key("taskmanager.memory.size")
          + .defaultValue(-1L);
          — End diff –

          This seems like an odd default value since it isn't actually a valid value for it. All usages where the value is -1 have a special case for it; `.noDefaultValue()` seems more appropriate.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3683#discussion_r110102530 — Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java — @@ -39,10 +39,53 @@ key("taskmanager.jvm-exit-on-oom") .defaultValue(false); + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */ + public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE = + key("taskmanager.memory.segment-size") + .defaultValue(32768); + + /** + * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not + * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION} . + */ + public static final ConfigOption<Long> MANAGED_MEMORY_SIZE = + key("taskmanager.memory.size") + .defaultValue(-1L); — End diff – This seems like an odd default value since it isn't actually a valid value for it. All usages where the value is -1 have a special case for it; `.noDefaultValue()` seems more appropriate.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3683#discussion_r110102292

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala —
          @@ -350,23 +350,18 @@ class LocalFlinkMiniCluster(

          def setMemory(config: Configuration): Unit = {
          // set this only if no memory was pre-configured

          • if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
            + if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == -1L) {
              • End diff –

          If we keep the default at -1 then we should compare against `TaskManagerOptons.MANAGED_MEMORY_SIZE.defaultValue()`; the same applies to other instances of this pattern.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3683#discussion_r110102292 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala — @@ -350,23 +350,18 @@ class LocalFlinkMiniCluster( def setMemory(config: Configuration): Unit = { // set this only if no memory was pre-configured if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) { + if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == -1L) { End diff – If we keep the default at -1 then we should compare against `TaskManagerOptons.MANAGED_MEMORY_SIZE.defaultValue()`; the same applies to other instances of this pattern.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3683#discussion_r110104138

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java —
          @@ -595,6 +595,35 @@ public boolean containsKey(String key){
          }
          }

          + /**
          + * Checks whether there is an entry for the given config option
          + *
          + * @param configOption The configuration option
          + *
          + * @return <tt>true</tt> if a valid (current of deprecated) key of the config option is stored,
          + * <tt>false</tt> otherwise
          + */
          + public boolean contains(ConfigOption<?> configOption) {
          — End diff –

          Could you annotate this with `@PublicEvolving` similar to other methods using `ConfigOption`s.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3683#discussion_r110104138 — Diff: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java — @@ -595,6 +595,35 @@ public boolean containsKey(String key){ } } + /** + * Checks whether there is an entry for the given config option + * + * @param configOption The configuration option + * + * @return <tt>true</tt> if a valid (current of deprecated) key of the config option is stored, + * <tt>false</tt> otherwise + */ + public boolean contains(ConfigOption<?> configOption) { — End diff – Could you annotate this with `@PublicEvolving` similar to other methods using `ConfigOption`s.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3683#discussion_r110103104

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java —
          @@ -595,6 +595,35 @@ public boolean containsKey(String key){
          }
          }

          + /**
          + * Checks whether there is an entry for the given config option
          + *
          + * @param configOption The configuration option
          + *
          + * @return <tt>true</tt> if a valid (current of deprecated) key of the config option is stored,
          — End diff –

          typo: current or deprecated

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3683#discussion_r110103104 — Diff: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java — @@ -595,6 +595,35 @@ public boolean containsKey(String key){ } } + /** + * Checks whether there is an entry for the given config option + * + * @param configOption The configuration option + * + * @return <tt>true</tt> if a valid (current of deprecated) key of the config option is stored, — End diff – typo: current or deprecated
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3683#discussion_r110106044

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java —
          @@ -39,10 +39,53 @@
          key("taskmanager.jvm-exit-on-oom")
          .defaultValue(false);

          + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */
          + public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
          + key("taskmanager.memory.segment-size")
          + .defaultValue(32768);
          +
          + /**
          + * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
          + * set, a relative fraction will be allocated, as defined by

          {@link #MANAGED_MEMORY_FRACTION}

          .
          + */
          + public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
          + key("taskmanager.memory.size")
          + .defaultValue(-1L);
          — End diff –

          yes, but unfortunately, `.noDefaultValue` is only specified for `ConfigOption<String>` - I'll change the comparisons to match against the default though - that already looked like a strange pattern

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3683#discussion_r110106044 — Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java — @@ -39,10 +39,53 @@ key("taskmanager.jvm-exit-on-oom") .defaultValue(false); + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */ + public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE = + key("taskmanager.memory.segment-size") + .defaultValue(32768); + + /** + * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not + * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION} . + */ + public static final ConfigOption<Long> MANAGED_MEMORY_SIZE = + key("taskmanager.memory.size") + .defaultValue(-1L); — End diff – yes, but unfortunately, `.noDefaultValue` is only specified for `ConfigOption<String>` - I'll change the comparisons to match against the default though - that already looked like a strange pattern
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3683#discussion_r110107855

          — Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java —
          @@ -39,10 +39,53 @@
          key("taskmanager.jvm-exit-on-oom")
          .defaultValue(false);

          + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */
          + public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
          + key("taskmanager.memory.segment-size")
          + .defaultValue(32768);
          +
          + /**
          + * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
          + * set, a relative fraction will be allocated, as defined by

          {@link #MANAGED_MEMORY_FRACTION}

          .
          + */
          + public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
          + key("taskmanager.memory.size")
          + .defaultValue(-1L);
          — End diff –

          Ok, didn't know that only works for strings:/

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3683#discussion_r110107855 — Diff: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java — @@ -39,10 +39,53 @@ key("taskmanager.jvm-exit-on-oom") .defaultValue(false); + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */ + public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE = + key("taskmanager.memory.segment-size") + .defaultValue(32768); + + /** + * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not + * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION} . + */ + public static final ConfigOption<Long> MANAGED_MEMORY_SIZE = + key("taskmanager.memory.size") + .defaultValue(-1L); — End diff – Ok, didn't know that only works for strings:/
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3683#discussion_r110108276

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala —
          @@ -350,23 +350,18 @@ class LocalFlinkMiniCluster(

          def setMemory(config: Configuration): Unit = {
          // set this only if no memory was pre-configured

          • if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
            + if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == -1L) {
              • End diff –

          I'm tempted to replace these with a `config.contains()` call but then the user would not be able to explicitly provide `-1` as the config option's value which is possible and valid at the moment. I don't really want to break existing behaviour though.
          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3683#discussion_r110108276 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala — @@ -350,23 +350,18 @@ class LocalFlinkMiniCluster( def setMemory(config: Configuration): Unit = { // set this only if no memory was pre-configured if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) { + if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == -1L) { End diff – I'm tempted to replace these with a `config.contains()` call but then the user would not be able to explicitly provide `-1` as the config option's value which is possible and valid at the moment. I don't really want to break existing behaviour though. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3683#discussion_r110110002

          — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala —
          @@ -350,23 +350,18 @@ class LocalFlinkMiniCluster(

          def setMemory(config: Configuration): Unit = {
          // set this only if no memory was pre-configured

          • if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
            + if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == -1L) {
              • End diff –

          i think we'll have to stick to comparisons for this one

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3683#discussion_r110110002 — Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala — @@ -350,23 +350,18 @@ class LocalFlinkMiniCluster( def setMemory(config: Configuration): Unit = { // set this only if no memory was pre-configured if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) { + if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) == -1L) { End diff – i think we'll have to stick to comparisons for this one
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3683

          merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3683 merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3683

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3683
          Hide
          Zentol Chesnay Schepler added a comment -

          1.3: e2a4f47ed8c95c7045f79bf9fe59cab39518710b

          Show
          Zentol Chesnay Schepler added a comment - 1.3: e2a4f47ed8c95c7045f79bf9fe59cab39518710b

            People

            • Assignee:
              NicoK Nico Kruber
              Reporter:
              NicoK Nico Kruber
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development