Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4545

Flink automatically manages TM network buffer

    Details

    • Type: Wish
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Network
    • Labels:
      None

      Description

      Currently, the number of network buffer per task manager is preconfigured and the memory is pre-allocated through taskmanager.network.numberOfBuffers config. In a Job DAG with shuffle phase, this number can go up very high depends on the TM cluster size. The formula for calculating the buffer count is documented here (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).

      #slots-per-TM^2 * #TMs * 4

      In a standalone deployment, we may need to control the task manager cluster size dynamically and then leverage the up-coming Flink feature to support scaling job parallelism/rescaling at runtime.
      If the buffer count config is static at runtime and cannot be changed without restarting task manager process, this may add latency and complexity for scaling process. I am wondering if there is already any discussion around whether the network buffer should be automatically managed by Flink or at least expose some API to allow it to be reconfigured. Let me know if there is any existing JIRA that I should follow.

        Issue Links

          Activity

          Hide
          StephanEwen Stephan Ewen added a comment -

          Finalized in 0bb49e538c118b8265377355a9667789a3971966

          Show
          StephanEwen Stephan Ewen added a comment - Finalized in 0bb49e538c118b8265377355a9667789a3971966
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3721

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3721
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3721

          Merging this.
          I filed a follow-up JIRA to address the "configuration with units" to make sure all memory-related parameters behave the same way, without loss of byte precision where needed: https://issues.apache.org/jira/browse/FLINK-6469

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3721 Merging this. I filed a follow-up JIRA to address the "configuration with units" to make sure all memory-related parameters behave the same way, without loss of byte precision where needed: https://issues.apache.org/jira/browse/FLINK-6469
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3721

          Okay, taking a step back. Looking through the code some more, the internal arithmetric should certainly stay in bytes. However, bytes are tedious to configure.

          I suggest to add support to the configuration to interpret memory units, so that we can configure values via

          • 512m
          • 10 kb
          • ...

          I have started some utility here: https://github.com/StephanEwen/incubator-flink/tree/mem_size

          That means that we would keep the PR in this form and add memory configuration parsing as a followup.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3721 Okay, taking a step back. Looking through the code some more, the internal arithmetric should certainly stay in bytes. However, bytes are tedious to configure. I suggest to add support to the configuration to interpret memory units, so that we can configure values via 512m 10 kb ... I have started some utility here: https://github.com/StephanEwen/incubator-flink/tree/mem_size That means that we would keep the PR in this form and add memory configuration parsing as a followup.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3721

          Code is good in general and well tested (including the shell scripts, which is great!)

          I would do some on-the fly polishing while merging. Main thing I want to adjust if having the configuration parameters specified in Megabytes, not Bytes. All other memory related parameters are in Megabytes, so that one should be as well, for consistency. Also, I think we don't need a finer granularity these days.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3721 Code is good in general and well tested (including the shell scripts, which is great!) I would do some on-the fly polishing while merging. Main thing I want to adjust if having the configuration parameters specified in Megabytes, not Bytes. All other memory related parameters are in Megabytes, so that one should be as well, for consistency. Also, I think we don't need a finer granularity these days.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3721

          I am checking this PR out now...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3721 I am checking this PR out now...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112248789

          — Diff: flink-dist/src/main/flink-bin/bin/config.sh —
          @@ -398,3 +428,106 @@ readSlaves() {
          useOffHeapMemory() {
          [[ "`echo $

          {FLINK_TM_OFFHEAP}

          | tr '[:upper:]' '[:lower:]'`" == "true" ]]
          }
          +
          +HAVE_AWK=
          +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config)
          +calculateNetworkBuf() {
          + local network_buffers_bytes
          + if [ "$

          {FLINK_TM_HEAP}" -le "0" ]; then
          + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
          + exit 1
          + fi
          +
          + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
          + # fix memory size for network buffers
          + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
          + else
          + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
          + echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
          + echo "Min must be less than or equal to max."
          + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          +
          + # Bash only performs integer arithmetic so floating point computation is performed using awk
          + if [[ -z "${HAVE_AWK}" ]] ; then
          + command -v awk >/dev/null 2>&1
          + if [[ $? -ne 0 ]]; then
          + echo "[ERROR] Program 'awk' not found."
          + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          + HAVE_AWK=true
          + fi
          +
          + # We calculate the memory using a fraction of the total memory
          + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
          + echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
          + echo "It must be between 0.0 and 1.0."
          + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          +
          + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP}

          ,20) * $

          {FLINK_TM_NET_BUF_FRACTION}

          ; netbuf = x > $

          {FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX}

          : x < $

          {FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN}

          : x; printf \"%.0f\n\", netbuf }"`
          + fi
          +
          + # recalculate the JVM heap memory by taking the network buffers into account
          — End diff –

          no, actually, the user may give the `FLINK_TM_HEAP` environment variable or configure the "flink heap size" via `taskmanager.heap.mb` but this is not the real "heap" size - rather the overall memory size used by flink (including off-heap). So this function removes the off-heap part and returns the real heap sizes to use with `-Xmx` and `-Xms`

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112248789 — Diff: flink-dist/src/main/flink-bin/bin/config.sh — @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo $ {FLINK_TM_OFFHEAP} | tr ' [:upper:] ' ' [:lower:] '`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { + local network_buffers_bytes + if [ "$ {FLINK_TM_HEAP}" -le "0" ]; then + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." + exit 1 + fi + + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then + # fix memory size for network buffers + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} + else + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then + echo " [ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." + echo "Min must be less than or equal to max." + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + # Bash only performs integer arithmetic so floating point computation is performed using awk + if [[ -z "${HAVE_AWK}" ]] ; then + command -v awk >/dev/null 2>&1 + if [[ $? -ne 0 ]]; then + echo " [ERROR] Program 'awk' not found." + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + HAVE_AWK=true + fi + + # We calculate the memory using a fraction of the total memory + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then + echo " [ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." + echo "It must be between 0.0 and 1.0." + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP} ,20) * $ {FLINK_TM_NET_BUF_FRACTION} ; netbuf = x > $ {FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < $ {FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` + fi + + # recalculate the JVM heap memory by taking the network buffers into account — End diff – no, actually, the user may give the `FLINK_TM_HEAP` environment variable or configure the "flink heap size" via `taskmanager.heap.mb` but this is not the real "heap" size - rather the overall memory size used by flink (including off-heap). So this function removes the off-heap part and returns the real heap sizes to use with `-Xmx` and `-Xms`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112247760

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java —
          @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
          }
          catch (Throwable t) {
          if (LOG.isErrorEnabled()) {

          • LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
            + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': "
              • End diff –

          You can leave it as it is.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112247760 — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java — @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " End diff – You can leave it as it is.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112246946

          — Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java —
          @@ -0,0 +1,306 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.dist;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
          +import org.apache.flink.util.OperatingSystem;
          +import org.apache.flink.util.TestLogger;
          +import org.junit.Assume;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.BufferedReader;
          +import java.io.IOException;
          +import java.io.InputStreamReader;
          +import java.util.Random;
          +
          +import static org.hamcrest.CoreMatchers.allOf;
          +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
          +import static org.hamcrest.Matchers.lessThanOrEqualTo;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertThat;
          +
          +/**
          + * Unit test that verifies that the task manager heap size calculation used by the bash script
          + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
          + *

          {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}

          .
          + *
          + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
          + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
          + * not need high precision.
          + */
          +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
          +
          + /** Key that is used by <tt>config.sh</tt>. */
          + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
          +
          + /**
          + * Number of tests with random values.
          + *
          + * NOTE: calling the external test script is slow and thus low numbers are preferred for general
          + * testing.
          + */
          + private static final int NUM_RANDOM_TESTS = 20;
          +
          + @Before
          + public void checkOperatingSystem()

          { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + }

          +
          + /**
          + * Tests that

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          has the same
          + * result as the shell script.
          + */
          + @Test
          + public void compareNetworkBufShellScriptWithJava() throws Exception {
          + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
          + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
          +
          + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /MB/, managedMemFrac), 0.0f);
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
          +
          + // some automated tests with random (but valid) values
          +
          + Random ran = new Random();
          + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
          + // tolerate that values differ by 1% (due to different floating point precisions)
          + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
          — End diff –

          oh, here, I actually do already print the configuration in the error message

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246946 — Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java — @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} . + * + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by <tt>config.sh</tt>. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** + * Number of tests with random values. + * + * NOTE: calling the external test script is slow and thus low numbers are preferred for general + * testing. + */ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** + * Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same + * result as the shell script. + */ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 / MB /, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); — End diff – oh, here, I actually do already print the configuration in the error message
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112246505

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java —
          @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
          }
          catch (Throwable t) {
          if (LOG.isErrorEnabled()) {

          • LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
            + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': "
              • End diff –

          sorry, should I create a separate PR? (a separate JIRA is definitely overkill for this)

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246505 — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java — @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " End diff – sorry, should I create a separate PR? (a separate JIRA is definitely overkill for this)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112246105

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java —
          @@ -0,0 +1,272 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.taskexecutor;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.core.memory.MemoryType;
          +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
          +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
          +import org.apache.flink.runtime.util.EnvironmentInformation;
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +import org.powermock.api.mockito.PowerMockito;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.net.InetAddress;
          +import java.util.Random;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +import static org.mockito.Mockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +
          +/**
          + * Unit test for

          {@link TaskManagerServices}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(EnvironmentInformation.class)
          +public class TaskManagerServicesTest {
          +
          + /**
          + * Test for

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
          + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          using new
          + * configurations via

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
          + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
          + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
          + */
          + @Test
          + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + }
          +
          + /**
          + * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
          + * new configuration parameters.
          + *
          + * @param config configuration object
          + */
          + private static void calculateNetworkBufNew(final Configuration config) {
          + // (2) fixed size memory
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
          +
          + // note: actual network buffer memory size is independent of the totalJavaMemorySize
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
          +
          + // (3) random fraction, min, and max values
          + Random ran = new Random();
          + for (int i = 0; i < 1_000; ++i){
          + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
          + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
          +
          + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
          +
          + long max = Math.max(min, ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
          +
          + long javaMem = Math.max(max + 1, ran.nextLong());
          +
          + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
          + assertTrue(networkBufMem >= min);
          + assertTrue(networkBufMem <= max);
          + if (networkBufMem > min && networkBufMem < max) { + assertEquals((long) (javaMem * frac), networkBufMem); + }
          + }
          + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed
          + * old/new configurations.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufMixed() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + + // old + 1 new parameter = new: + Configuration config1 = config.clone(); + config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + + config1 = config.clone(); + long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value possible + config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin); + assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((10L << 20), config1)); + assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + + config1 = config.clone(); + long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()); + config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax); + assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1)); + assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1)); + + // old + any new parameter = new: + calculateNetworkBufNew(config); + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(TaskManagerServicesConfiguration)}
          + * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}.
          + */
          + @Test
          + public void calculateNetworkBufFromHeapSize() throws Exception { + PowerMockito.mockStatic(EnvironmentInformation.class); + // some defaults: + when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB + + TaskManagerServicesConfiguration tmConfig; + + tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(), + TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, MemoryType.HEAP); + when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB + assertEquals(100L << 20, TaskManagerServices.calculateNetworkBuf(tmConfig)); + + tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBuf(tmConfig)); + + tmConfig = getTmConfig(-1, 0.1f, + 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBuf(tmConfig)); + }
          +
          + /**
          + * Returns a task manager services configuration for the tests
          + *
          + * @param managedMemory see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
          + * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
          + * @param networkBufFraction see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}

          + * @param networkBufMin see

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}

          + * @param networkBufMax see

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}

          + * @param memType on-heap or off-heap
          + *
          + * @return configuration object
          + */
          + private static TaskManagerServicesConfiguration getTmConfig(
          + final long managedMemory, final float managedMemoryFraction, float networkBufFraction,
          + long networkBufMin, long networkBufMax,
          + final MemoryType memType) {
          +
          + final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
          + networkBufFraction,
          + networkBufMin,
          + networkBufMax,
          + TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
          + memType,
          + null,
          + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
          + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
          + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
          + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
          + null);
          +
          + return new TaskManagerServicesConfiguration(
          + mock(InetAddress.class),
          + new String[] {},
          + networkConfig,
          + QueryableStateConfiguration.disabled(),
          + 1,
          + managedMemory,
          + false,
          + managedMemoryFraction,
          + mock(MetricRegistryConfiguration.class),
          + 0);
          + }
          +
          + /**
          + * Test for

          {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}

          with some
          + * manually calculated scenarios.
          + */
          + @Test
          + public void calculateHeapSizeMB() throws Exception {
          + Configuration config = new Configuration();
          + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
          +
          + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
          + assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config));
          +
          + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
          + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
          + assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
          +
          + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory
          + config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
          + assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
          — End diff –

          yes, unfortunately, that is the case and it has always been that way in the past (with the network buffers not being a fraction but a fixed amount of memory) but it is also properly documented (now).

          With the possibility to specify min and max values for the network buffer memory size, the actual fraction may be different than the given one and we don't really want to fail jobs because we can't ensure the given fraction for the managed memory in that case, do we?
          As a side note, this is also the safest way to ensure that the invariants hold, especially for the 0.5 vs. 0.5 example: inside Java, we will always allocate the network buffer memory first and then identify the remaining free heap space (if on-heap) from which we will use the given fraction.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112246105 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java — @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new + * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. + */ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } + + /** + * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the + * new configuration parameters. + * + * @param config configuration object + */ + private static void calculateNetworkBufNew(final Configuration config) { + // (2) fixed size memory + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config)); + + // (3) random fraction, min, and max values + Random ran = new Random(); + for (int i = 0; i < 1_000; ++i){ + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac); + + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min); + + long max = Math.max(min, ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max); + + long javaMem = Math.max(max + 1, ran.nextLong()); + + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config); + assertTrue(networkBufMem >= min); + assertTrue(networkBufMem <= max); + if (networkBufMem > min && networkBufMem < max) { + assertEquals((long) (javaMem * frac), networkBufMem); + } + } + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed + * old/new configurations. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufMixed() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + + // old + 1 new parameter = new: + Configuration config1 = config.clone(); + config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + + config1 = config.clone(); + long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value possible + config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin); + assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((10L << 20), config1)); + assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + + config1 = config.clone(); + long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()); + config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax); + assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1)); + assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1)); + + // old + any new parameter = new: + calculateNetworkBufNew(config); + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(TaskManagerServicesConfiguration)} + * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}. + */ + @Test + public void calculateNetworkBufFromHeapSize() throws Exception { + PowerMockito.mockStatic(EnvironmentInformation.class); + // some defaults: + when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB + + TaskManagerServicesConfiguration tmConfig; + + tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(), + TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, MemoryType.HEAP); + when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB + assertEquals(100L << 20, TaskManagerServices.calculateNetworkBuf(tmConfig)); + + tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBuf(tmConfig)); + + tmConfig = getTmConfig(-1, 0.1f, + 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBuf(tmConfig)); + } + + /** + * Returns a task manager services configuration for the tests + * + * @param managedMemory see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} + * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} + * @param networkBufFraction see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION} + * @param networkBufMin see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} + * @param networkBufMax see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX} + * @param memType on-heap or off-heap + * + * @return configuration object + */ + private static TaskManagerServicesConfiguration getTmConfig( + final long managedMemory, final float managedMemoryFraction, float networkBufFraction, + long networkBufMin, long networkBufMax, + final MemoryType memType) { + + final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( + networkBufFraction, + networkBufMin, + networkBufMax, + TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), + memType, + null, + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(), + null); + + return new TaskManagerServicesConfiguration( + mock(InetAddress.class), + new String[] {}, + networkConfig, + QueryableStateConfiguration.disabled(), + 1, + managedMemory, + false, + managedMemoryFraction, + mock(MetricRegistryConfiguration.class), + 0); + } + + /** + * Test for {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} with some + * manually calculated scenarios. + */ + @Test + public void calculateHeapSizeMB() throws Exception { + Configuration config = new Configuration(); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB + + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false); + assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config)); + + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB + assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config)); + + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory + config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10% + assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config)); — End diff – yes, unfortunately, that is the case and it has always been that way in the past (with the network buffers not being a fraction but a fixed amount of memory) but it is also properly documented (now). With the possibility to specify min and max values for the network buffer memory size, the actual fraction may be different than the given one and we don't really want to fail jobs because we can't ensure the given fraction for the managed memory in that case, do we? As a side note, this is also the safest way to ensure that the invariants hold, especially for the 0.5 vs. 0.5 example: inside Java, we will always allocate the network buffer memory first and then identify the remaining free heap space (if on-heap) from which we will use the given fraction.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112242996

          — Diff: flink-dist/src/main/flink-bin/bin/config.sh —
          @@ -398,3 +428,106 @@ readSlaves() {
          useOffHeapMemory() {
          [[ "`echo $

          {FLINK_TM_OFFHEAP}

          | tr '[:upper:]' '[:lower:]'`" == "true" ]]
          }
          +
          +HAVE_AWK=
          +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config)
          +calculateNetworkBuf() {
          + local network_buffers_bytes
          + if [ "$

          {FLINK_TM_HEAP}" -le "0" ]; then
          + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
          + exit 1
          + fi
          +
          + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
          + # fix memory size for network buffers
          + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
          + else
          + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
          + echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
          + echo "Min must be less than or equal to max."
          + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          +
          + # Bash only performs integer arithmetic so floating point computation is performed using awk
          + if [[ -z "${HAVE_AWK}" ]] ; then
          + command -v awk >/dev/null 2>&1
          + if [[ $? -ne 0 ]]; then
          + echo "[ERROR] Program 'awk' not found."
          + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          + HAVE_AWK=true
          + fi
          +
          + # We calculate the memory using a fraction of the total memory
          + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
          + echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
          + echo "It must be between 0.0 and 1.0."
          + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          +
          + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP}

          ,20) * $

          {FLINK_TM_NET_BUF_FRACTION}

          ; netbuf = x > $

          {FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX}

          : x < $

          {FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN}

          : x; printf \"%.0f\n\", netbuf }"`
          + fi
          +
          + # recalculate the JVM heap memory by taking the network buffers into account
          — End diff –

          To me, "recalculate" implied that it would change some configuration value, but that's not happening. It's only verifying that the memory for network buffers is less than the heap memory.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112242996 — Diff: flink-dist/src/main/flink-bin/bin/config.sh — @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo $ {FLINK_TM_OFFHEAP} | tr ' [:upper:] ' ' [:lower:] '`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { + local network_buffers_bytes + if [ "$ {FLINK_TM_HEAP}" -le "0" ]; then + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." + exit 1 + fi + + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then + # fix memory size for network buffers + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} + else + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then + echo " [ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." + echo "Min must be less than or equal to max." + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + # Bash only performs integer arithmetic so floating point computation is performed using awk + if [[ -z "${HAVE_AWK}" ]] ; then + command -v awk >/dev/null 2>&1 + if [[ $? -ne 0 ]]; then + echo " [ERROR] Program 'awk' not found." + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + HAVE_AWK=true + fi + + # We calculate the memory using a fraction of the total memory + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then + echo " [ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." + echo "It must be between 0.0 and 1.0." + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP} ,20) * $ {FLINK_TM_NET_BUF_FRACTION} ; netbuf = x > $ {FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < $ {FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` + fi + + # recalculate the JVM heap memory by taking the network buffers into account — End diff – To me, "recalculate" implied that it would change some configuration value, but that's not happening. It's only verifying that the memory for network buffers is less than the heap memory.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112242842

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java —
          @@ -0,0 +1,272 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.taskexecutor;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.core.memory.MemoryType;
          +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
          +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
          +import org.apache.flink.runtime.util.EnvironmentInformation;
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +import org.powermock.api.mockito.PowerMockito;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.net.InetAddress;
          +import java.util.Random;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +import static org.mockito.Mockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +
          +/**
          + * Unit test for

          {@link TaskManagerServices}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(EnvironmentInformation.class)
          +public class TaskManagerServicesTest {
          +
          + /**
          + * Test for

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
          + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          using new
          + * configurations via

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}

          ,
          + *

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}

          and
          + *

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}

          .
          + */
          + @Test
          + public void calculateNetworkBufNew() throws Exception

          { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + }

          +
          + /**
          + * Helper to test

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
          + * new configuration parameters.
          + *
          + * @param config configuration object
          + */
          + private static void calculateNetworkBufNew(final Configuration config) {
          + // (2) fixed size memory
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
          +
          + // note: actual network buffer memory size is independent of the totalJavaMemorySize
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
          +
          + // (3) random fraction, min, and max values
          + Random ran = new Random();
          + for (int i = 0; i < 1_000; ++i){
          + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
          + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
          +
          + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
          +
          + long max = Math.max(min, ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
          +
          + long javaMem = Math.max(max + 1, ran.nextLong());
          +
          + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
          + assertTrue(networkBufMem >= min);
          + assertTrue(networkBufMem <= max);
          + if (networkBufMem > min && networkBufMem < max) { + assertEquals((long) (javaMem * frac), networkBufMem); + }
          + }
          + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          using mixed
          + * old/new configurations.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufMixed() throws Exception {
          + Configuration config = new Configuration();
          + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
          +
          + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
          + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
          + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
          +
          + // old + 1 new parameter = new:
          + Configuration config1 = config.clone();
          + config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
          + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))),
          — End diff –

          yes, if we define our own test-defaults, your are right - however, we would decouple the test from the defaults that are set for real-world applications and I wanted to keep them as close as possible including any future change in the default values

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112242842 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java — @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new + * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION} , + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX} . + */ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } + + /** + * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the + * new configuration parameters. + * + * @param config configuration object + */ + private static void calculateNetworkBufNew(final Configuration config) { + // (2) fixed size memory + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config)); + + // (3) random fraction, min, and max values + Random ran = new Random(); + for (int i = 0; i < 1_000; ++i){ + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac); + + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min); + + long max = Math.max(min, ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max); + + long javaMem = Math.max(max + 1, ran.nextLong()); + + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config); + assertTrue(networkBufMem >= min); + assertTrue(networkBufMem <= max); + if (networkBufMem > min && networkBufMem < max) { + assertEquals((long) (javaMem * frac), networkBufMem); + } + } + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed + * old/new configurations. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufMixed() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + + // old + 1 new parameter = new: + Configuration config1 = config.clone(); + config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))), — End diff – yes, if we define our own test-defaults, your are right - however, we would decouple the test from the defaults that are set for real-world applications and I wanted to keep them as close as possible including any future change in the default values
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112239687

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java —
          @@ -0,0 +1,272 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.taskexecutor;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.core.memory.MemoryType;
          +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
          +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
          +import org.apache.flink.runtime.util.EnvironmentInformation;
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +import org.powermock.api.mockito.PowerMockito;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.net.InetAddress;
          +import java.util.Random;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +import static org.mockito.Mockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +
          +/**
          + * Unit test for

          {@link TaskManagerServices}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(EnvironmentInformation.class)
          +public class TaskManagerServicesTest {
          +
          + /**
          + * Test for

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
          + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          using new
          + * configurations via

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}

          ,
          + *

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}

          and
          + *

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}

          .
          + */
          + @Test
          + public void calculateNetworkBufNew() throws Exception

          { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + }

          +
          + /**
          + * Helper to test

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          with the
          + * new configuration parameters.
          + *
          + * @param config configuration object
          + */
          + private static void calculateNetworkBufNew(final Configuration config) {
          + // (2) fixed size memory
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
          +
          + // note: actual network buffer memory size is independent of the totalJavaMemorySize
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
          +
          + // (3) random fraction, min, and max values
          + Random ran = new Random();
          + for (int i = 0; i < 1_000; ++i){
          + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
          + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
          +
          + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
          +
          + long max = Math.max(min, ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
          +
          + long javaMem = Math.max(max + 1, ran.nextLong());
          +
          + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
          — End diff –

          Random testing is quite powerful to identify unforeseen errors, especially since these inputs are user-configurable. I'll keep that but add better error messages as suggested.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112239687 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java — @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new + * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION} , + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX} . + */ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } + + /** + * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the + * new configuration parameters. + * + * @param config configuration object + */ + private static void calculateNetworkBufNew(final Configuration config) { + // (2) fixed size memory + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config)); + + // (3) random fraction, min, and max values + Random ran = new Random(); + for (int i = 0; i < 1_000; ++i){ + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac); + + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min); + + long max = Math.max(min, ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max); + + long javaMem = Math.max(max + 1, ran.nextLong()); + + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config); — End diff – Random testing is quite powerful to identify unforeseen errors, especially since these inputs are user-configurable. I'll keep that but add better error messages as suggested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3721

          For the two tests that failed on Travis CI: they were simply killed and a "`Killed`" appeared in their logs which is usually an indicator that memory ran out and the kernel killed a process

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3721 For the two tests that failed on Travis CI: they were simply killed and a "`Killed`" appeared in their logs which is usually an indicator that memory ran out and the kernel killed a process
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112236022

          — Diff: flink-dist/src/main/flink-bin/bin/config.sh —
          @@ -398,3 +428,106 @@ readSlaves() {
          useOffHeapMemory() {
          [[ "`echo $

          {FLINK_TM_OFFHEAP}

          | tr '[:upper:]' '[:lower:]'`" == "true" ]]
          }
          +
          +HAVE_AWK=
          +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config)
          +calculateNetworkBuf() {
          + local network_buffers_bytes
          + if [ "$

          {FLINK_TM_HEAP}" -le "0" ]; then
          + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
          + exit 1
          + fi
          +
          + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
          + # fix memory size for network buffers
          + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
          + else
          + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
          + echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
          + echo "Min must be less than or equal to max."
          + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          +
          + # Bash only performs integer arithmetic so floating point computation is performed using awk
          + if [[ -z "${HAVE_AWK}" ]] ; then
          + command -v awk >/dev/null 2>&1
          + if [[ $? -ne 0 ]]; then
          + echo "[ERROR] Program 'awk' not found."
          + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          + HAVE_AWK=true
          + fi
          +
          + # We calculate the memory using a fraction of the total memory
          + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
          + echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
          + echo "It must be between 0.0 and 1.0."
          + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          +
          + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP}

          ,20) * $

          {FLINK_TM_NET_BUF_FRACTION}

          ; netbuf = x > $

          {FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX}

          : x < $

          {FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN}

          : x; printf \"%.0f\n\", netbuf }"`
          + fi
          +
          + # recalculate the JVM heap memory by taking the network buffers into account
          — End diff –

          what do you mean?

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112236022 — Diff: flink-dist/src/main/flink-bin/bin/config.sh — @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo $ {FLINK_TM_OFFHEAP} | tr ' [:upper:] ' ' [:lower:] '`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { + local network_buffers_bytes + if [ "$ {FLINK_TM_HEAP}" -le "0" ]; then + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." + exit 1 + fi + + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then + # fix memory size for network buffers + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} + else + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then + echo " [ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." + echo "Min must be less than or equal to max." + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + # Bash only performs integer arithmetic so floating point computation is performed using awk + if [[ -z "${HAVE_AWK}" ]] ; then + command -v awk >/dev/null 2>&1 + if [[ $? -ne 0 ]]; then + echo " [ERROR] Program 'awk' not found." + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + HAVE_AWK=true + fi + + # We calculate the memory using a fraction of the total memory + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then + echo " [ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." + echo "It must be between 0.0 and 1.0." + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP} ,20) * $ {FLINK_TM_NET_BUF_FRACTION} ; netbuf = x > $ {FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < $ {FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` + fi + + # recalculate the JVM heap memory by taking the network buffers into account — End diff – what do you mean?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112002748

          — Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java —
          @@ -0,0 +1,306 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.dist;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
          +import org.apache.flink.util.OperatingSystem;
          +import org.apache.flink.util.TestLogger;
          +import org.junit.Assume;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.BufferedReader;
          +import java.io.IOException;
          +import java.io.InputStreamReader;
          +import java.util.Random;
          +
          +import static org.hamcrest.CoreMatchers.allOf;
          +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
          +import static org.hamcrest.Matchers.lessThanOrEqualTo;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertThat;
          +
          +/**
          + * Unit test that verifies that the task manager heap size calculation used by the bash script
          + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
          + *

          {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}

          .
          + *
          + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
          + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
          + * not need high precision.
          + */
          +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
          +
          + /** Key that is used by <tt>config.sh</tt>. */
          + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
          +
          + /**
          + * Number of tests with random values.
          + *
          + * NOTE: calling the external test script is slow and thus low numbers are preferred for general
          + * testing.
          + */
          + private static final int NUM_RANDOM_TESTS = 20;
          +
          + @Before
          + public void checkOperatingSystem()

          { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + }

          +
          + /**
          + * Tests that

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          has the same
          + * result as the shell script.
          + */
          + @Test
          + public void compareNetworkBufShellScriptWithJava() throws Exception {
          + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
          + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
          +
          + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /MB/, managedMemFrac), 0.0f);
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
          +
          + // some automated tests with random (but valid) values
          +
          + Random ran = new Random();
          + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
          + // tolerate that values differ by 1% (due to different floating point precisions)
          + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
          — End diff –

          As with the other randomized tests we should print the configured used for the test in case of failure.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112002748 — Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java — @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} . + * + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by <tt>config.sh</tt>. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** + * Number of tests with random values. + * + * NOTE: calling the external test script is slow and thus low numbers are preferred for general + * testing. + */ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** + * Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same + * result as the shell script. + */ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 / MB /, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); — End diff – As with the other randomized tests we should print the configured used for the test in case of failure.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112000165

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java —
          @@ -0,0 +1,272 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.taskexecutor;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.core.memory.MemoryType;
          +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
          +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
          +import org.apache.flink.runtime.util.EnvironmentInformation;
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +import org.powermock.api.mockito.PowerMockito;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.net.InetAddress;
          +import java.util.Random;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +import static org.mockito.Mockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +
          +/**
          + * Unit test for

          {@link TaskManagerServices}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(EnvironmentInformation.class)
          +public class TaskManagerServicesTest {
          +
          + /**
          + * Test for

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
          + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          using new
          + * configurations via

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
          + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and
          + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}.
          + */
          + @Test
          + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + }
          +
          + /**
          + * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
          + * new configuration parameters.
          + *
          + * @param config configuration object
          + */
          + private static void calculateNetworkBufNew(final Configuration config) {
          + // (2) fixed size memory
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
          +
          + // note: actual network buffer memory size is independent of the totalJavaMemorySize
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
          +
          + // (3) random fraction, min, and max values
          + Random ran = new Random();
          + for (int i = 0; i < 1_000; ++i){
          + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
          + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
          +
          + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
          +
          + long max = Math.max(min, ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
          +
          + long javaMem = Math.max(max + 1, ran.nextLong());
          +
          + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
          + assertTrue(networkBufMem >= min);
          + assertTrue(networkBufMem <= max);
          + if (networkBufMem > min && networkBufMem < max) { + assertEquals((long) (javaMem * frac), networkBufMem); + }
          + }
          + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed
          + * old/new configurations.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufMixed() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + + // old + 1 new parameter = new: + Configuration config1 = config.clone(); + config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + + config1 = config.clone(); + long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value possible + config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin); + assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((10L << 20), config1)); + assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + + config1 = config.clone(); + long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()); + config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax); + assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1)); + assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1)); + + // old + any new parameter = new: + calculateNetworkBufNew(config); + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(TaskManagerServicesConfiguration)}
          + * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}.
          + */
          + @Test
          + public void calculateNetworkBufFromHeapSize() throws Exception { + PowerMockito.mockStatic(EnvironmentInformation.class); + // some defaults: + when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB + + TaskManagerServicesConfiguration tmConfig; + + tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(), + TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, MemoryType.HEAP); + when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB + assertEquals(100L << 20, TaskManagerServices.calculateNetworkBuf(tmConfig)); + + tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBuf(tmConfig)); + + tmConfig = getTmConfig(-1, 0.1f, + 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBuf(tmConfig)); + }
          +
          + /**
          + * Returns a task manager services configuration for the tests
          + *
          + * @param managedMemory see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}
          + * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}
          + * @param networkBufFraction see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}

          + * @param networkBufMin see

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}

          + * @param networkBufMax see

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}

          + * @param memType on-heap or off-heap
          + *
          + * @return configuration object
          + */
          + private static TaskManagerServicesConfiguration getTmConfig(
          + final long managedMemory, final float managedMemoryFraction, float networkBufFraction,
          + long networkBufMin, long networkBufMax,
          + final MemoryType memType) {
          +
          + final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration(
          + networkBufFraction,
          + networkBufMin,
          + networkBufMax,
          + TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
          + memType,
          + null,
          + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
          + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
          + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
          + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(),
          + null);
          +
          + return new TaskManagerServicesConfiguration(
          + mock(InetAddress.class),
          + new String[] {},
          + networkConfig,
          + QueryableStateConfiguration.disabled(),
          + 1,
          + managedMemory,
          + false,
          + managedMemoryFraction,
          + mock(MetricRegistryConfiguration.class),
          + 0);
          + }
          +
          + /**
          + * Test for

          {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}

          with some
          + * manually calculated scenarios.
          + */
          + @Test
          + public void calculateHeapSizeMB() throws Exception {
          + Configuration config = new Configuration();
          + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
          +
          + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
          + assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config));
          +
          + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
          + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
          + assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
          +
          + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory
          + config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
          + assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
          — End diff –

          Now that i see this test it got me thinking: the managedMemory and networkBuffersmemory frations do not work on the same base value; i.e if both are set at 0.5 then one (the network i think) gets 0.5 of the total memory, while the managedMemory gets 0.25. I'm wondering how intuitive this is; they are similar when used alone, but when both are used 0.5 doesn't equal 0.5 in a way.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112000165 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java — @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new + * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}. + */ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } + + /** + * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the + * new configuration parameters. + * + * @param config configuration object + */ + private static void calculateNetworkBufNew(final Configuration config) { + // (2) fixed size memory + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config)); + + // (3) random fraction, min, and max values + Random ran = new Random(); + for (int i = 0; i < 1_000; ++i){ + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac); + + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min); + + long max = Math.max(min, ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max); + + long javaMem = Math.max(max + 1, ran.nextLong()); + + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config); + assertTrue(networkBufMem >= min); + assertTrue(networkBufMem <= max); + if (networkBufMem > min && networkBufMem < max) { + assertEquals((long) (javaMem * frac), networkBufMem); + } + } + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed + * old/new configurations. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufMixed() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + + // old + 1 new parameter = new: + Configuration config1 = config.clone(); + config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + + config1 = config.clone(); + long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value possible + config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin); + assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((10L << 20), config1)); + assertEquals(Math.min(defaultMax, Math.max(newMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + + config1 = config.clone(); + long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()); + config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax); + assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config1)); + assertEquals(Math.min(newMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config1)); + assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1)); + + // old + any new parameter = new: + calculateNetworkBufNew(config); + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(TaskManagerServicesConfiguration)} + * using the same (manual) test cases as in {@link #calculateHeapSizeMB()}. + */ + @Test + public void calculateNetworkBufFromHeapSize() throws Exception { + PowerMockito.mockStatic(EnvironmentInformation.class); + // some defaults: + when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(1000L << 20); // 1000MB + + TaskManagerServicesConfiguration tmConfig; + + tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(), + TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, MemoryType.HEAP); + when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB + assertEquals(100L << 20, TaskManagerServices.calculateNetworkBuf(tmConfig)); + + tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(), + 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(890L << 20); // 890MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBuf(tmConfig)); + + tmConfig = getTmConfig(-1, 0.1f, + 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP); + when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(810L << 20); // 810MB + assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */, + TaskManagerServices.calculateNetworkBuf(tmConfig)); + } + + /** + * Returns a task manager services configuration for the tests + * + * @param managedMemory see {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} + * @param managedMemoryFraction see {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} + * @param networkBufFraction see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION} + * @param networkBufMin see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} + * @param networkBufMax see {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX} + * @param memType on-heap or off-heap + * + * @return configuration object + */ + private static TaskManagerServicesConfiguration getTmConfig( + final long managedMemory, final float managedMemoryFraction, float networkBufFraction, + long networkBufMin, long networkBufMax, + final MemoryType memType) { + + final NetworkEnvironmentConfiguration networkConfig = new NetworkEnvironmentConfiguration( + networkBufFraction, + networkBufMin, + networkBufMax, + TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), + memType, + null, + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), + TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), + TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), + TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE.defaultValue(), + null); + + return new TaskManagerServicesConfiguration( + mock(InetAddress.class), + new String[] {}, + networkConfig, + QueryableStateConfiguration.disabled(), + 1, + managedMemory, + false, + managedMemoryFraction, + mock(MetricRegistryConfiguration.class), + 0); + } + + /** + * Test for {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} with some + * manually calculated scenarios. + */ + @Test + public void calculateHeapSizeMB() throws Exception { + Configuration config = new Configuration(); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB + + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false); + assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config)); + + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true); + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB + assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config)); + + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory + config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10% + assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config)); — End diff – Now that i see this test it got me thinking: the managedMemory and networkBuffersmemory frations do not work on the same base value; i.e if both are set at 0.5 then one (the network i think) gets 0.5 of the total memory, while the managedMemory gets 0.25. I'm wondering how intuitive this is; they are similar when used alone, but when both are used 0.5 doesn't equal 0.5 in a way.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112003041

          — Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java —
          @@ -0,0 +1,306 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.dist;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
          +import org.apache.flink.util.OperatingSystem;
          +import org.apache.flink.util.TestLogger;
          +import org.junit.Assume;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.BufferedReader;
          +import java.io.IOException;
          +import java.io.InputStreamReader;
          +import java.util.Random;
          +
          +import static org.hamcrest.CoreMatchers.allOf;
          +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
          +import static org.hamcrest.Matchers.lessThanOrEqualTo;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertThat;
          +
          +/**
          + * Unit test that verifies that the task manager heap size calculation used by the bash script
          + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
          + *

          {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
          + *
          + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
          + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
          + * not need high precision.
          + */
          +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
          +
          + /** Key that is used by <tt>config.sh</tt>. */
          + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
          +
          + /**
          + * Number of tests with random values.
          + *
          + * NOTE: calling the external test script is slow and thus low numbers are preferred for general
          + * testing.
          + */
          + private static final int NUM_RANDOM_TESTS = 20;
          +
          + @Before
          + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + }
          +
          + /**
          + * Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same
          + * result as the shell script.
          + */
          + @Test
          + public void compareNetworkBufShellScriptWithJava() throws Exception {
          + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
          + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
          +
          + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /MB/, managedMemFrac), 0.0f);
          +
          + compareNetworkBufJavaVsScript(
          + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
          +
          + // some automated tests with random (but valid) values
          +
          + Random ran = new Random();
          + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); + }
          + }
          +
          + /**
          + * Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}

          has the same
          + * result as the shell script.
          + */
          + @Test
          + public void compareHeapSizeShellScriptWithJava() throws Exception {
          + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
          + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
          +
          + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
          +
          + compareHeapSizeJavaVsScript(
          + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
          +
          + compareHeapSizeJavaVsScript(
          + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /MB/, managedMemFrac), 0.0f);
          +
          + compareHeapSizeJavaVsScript(
          + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
          +
          + // some automated tests with random (but valid) values
          +
          + Random ran = new Random();
          + for (int i = 0; i < NUM_RANDOM_TESTS; ++i)

          { + // tolerate that values differ by 1% (due to different floating point precisions) + compareHeapSizeJavaVsScript(getRandomConfig(ran), 0.01f); + }

          + }
          +
          + /**
          + * Returns a flink configuration object with the given values.
          + *
          + * @param javaMemMB
          + * total JVM memory to use (in megabytes)
          + * @param useOffHeap
          + * whether to use off-heap memory (<tt>true</tt>) or not (<tt>false</tt>)
          + * @param netBufMemFrac
          + * fraction of JVM memory to use for network buffers
          + * @param netBufMemMin
          + * minimum memory size for network buffers (in bytes)
          + * @param netBufMemMax
          + * maximum memory size for network buffers (in bytes)
          + * @param managedMemSizeMB
          + * amount of managed memory (in megabytes)
          + * @param managedMemFrac
          + * fraction of free memory to use for managed memory (if <tt>managedMemSizeMB</tt> is
          + * <tt>-1</tt>)
          + *
          + * @return flink configuration
          + */
          + private static Configuration getConfig(
          + final int javaMemMB, final boolean useOffHeap, final float netBufMemFrac,
          + final long netBufMemMin, final long netBufMemMax, final int managedMemSizeMB,
          + final float managedMemFrac)

          { + + Configuration config = new Configuration(); + + config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB); + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap); + + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, netBufMemMin); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, netBufMemMax); + + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB); + config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac); + + return config; + }

          +
          + /**
          + * Returns a flink configuration object with random values (only those relevant to the tests in
          + * this class.
          + *
          + * @param ran random number generator
          + *
          + * @return flink configuration
          + */
          + private static Configuration getRandomConfig(final Random ran) {
          +
          + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
          +
          +// long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
          — End diff –

          these lines can be removed i guess

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112003041 — Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java — @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}. + * + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by <tt>config.sh</tt>. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** + * Number of tests with random values. + * + * NOTE: calling the external test script is slow and thus low numbers are preferred for general + * testing. + */ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", + !OperatingSystem.isWindows()); + } + + /** + * Tests that {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} has the same + * result as the shell script. + */ + @Test + public void compareNetworkBufShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareNetworkBufJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 / MB /, managedMemFrac), 0.0f); + + compareNetworkBufJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f); + } + } + + /** + * Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} has the same + * result as the shell script. + */ + @Test + public void compareHeapSizeShellScriptWithJava() throws Exception { + int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue(); + float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(); + + // manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB() + + compareHeapSizeJavaVsScript( + getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f); + + compareHeapSizeJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 / MB /, managedMemFrac), 0.0f); + + compareHeapSizeJavaVsScript( + getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f); + + // some automated tests with random (but valid) values + + Random ran = new Random(); + for (int i = 0; i < NUM_RANDOM_TESTS; ++i) { + // tolerate that values differ by 1% (due to different floating point precisions) + compareHeapSizeJavaVsScript(getRandomConfig(ran), 0.01f); + } + } + + /** + * Returns a flink configuration object with the given values. + * + * @param javaMemMB + * total JVM memory to use (in megabytes) + * @param useOffHeap + * whether to use off-heap memory (<tt>true</tt>) or not (<tt>false</tt>) + * @param netBufMemFrac + * fraction of JVM memory to use for network buffers + * @param netBufMemMin + * minimum memory size for network buffers (in bytes) + * @param netBufMemMax + * maximum memory size for network buffers (in bytes) + * @param managedMemSizeMB + * amount of managed memory (in megabytes) + * @param managedMemFrac + * fraction of free memory to use for managed memory (if <tt>managedMemSizeMB</tt> is + * <tt>-1</tt>) + * + * @return flink configuration + */ + private static Configuration getConfig( + final int javaMemMB, final boolean useOffHeap, final float netBufMemFrac, + final long netBufMemMin, final long netBufMemMax, final int managedMemSizeMB, + final float managedMemFrac) { + + Configuration config = new Configuration(); + + config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB); + config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap); + + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, netBufMemMin); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, netBufMemMax); + + config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB); + config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac); + + return config; + } + + /** + * Returns a flink configuration object with random values (only those relevant to the tests in + * this class. + * + * @param ran random number generator + * + * @return flink configuration + */ + private static Configuration getRandomConfig(final Random ran) { + + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE); + +// long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong()); — End diff – these lines can be removed i guess
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r111975142

          — Diff: flink-dist/src/main/flink-bin/bin/config.sh —
          @@ -398,3 +428,106 @@ readSlaves() {
          useOffHeapMemory() {
          [[ "`echo $

          {FLINK_TM_OFFHEAP}

          | tr '[:upper:]' '[:lower:]'`" == "true" ]]
          }
          +
          +HAVE_AWK=
          +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config)
          +calculateNetworkBuf() {
          + local network_buffers_bytes
          + if [ "$

          {FLINK_TM_HEAP}" -le "0" ]; then
          + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
          + exit 1
          + fi
          +
          + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then
          + # fix memory size for network buffers
          + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN}
          + else
          + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then
          + echo "[ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid."
          + echo "Min must be less than or equal to max."
          + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          +
          + # Bash only performs integer arithmetic so floating point computation is performed using awk
          + if [[ -z "${HAVE_AWK}" ]] ; then
          + command -v awk >/dev/null 2>&1
          + if [[ $? -ne 0 ]]; then
          + echo "[ERROR] Program 'awk' not found."
          + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          + HAVE_AWK=true
          + fi
          +
          + # We calculate the memory using a fraction of the total memory
          + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then
          + echo "[ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value."
          + echo "It must be between 0.0 and 1.0."
          + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}."
          + exit 1
          + fi
          +
          + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP}

          ,20) * $

          {FLINK_TM_NET_BUF_FRACTION}

          ; netbuf = x > $

          {FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX}

          : x < $

          {FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN}

          : x; printf \"%.0f\n\", netbuf }"`
          + fi
          +
          + # recalculate the JVM heap memory by taking the network buffers into account
          — End diff –

          This is more of a verification isn't it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111975142 — Diff: flink-dist/src/main/flink-bin/bin/config.sh — @@ -398,3 +428,106 @@ readSlaves() { useOffHeapMemory() { [[ "`echo $ {FLINK_TM_OFFHEAP} | tr ' [:upper:] ' ' [:lower:] '`" == "true" ]] } + +HAVE_AWK= +# same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBuf(long totalJavaMemorySize, Configuration config) +calculateNetworkBuf() { + local network_buffers_bytes + if [ "$ {FLINK_TM_HEAP}" -le "0" ]; then + echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})." + exit 1 + fi + + if [[ "${FLINK_TM_NET_BUF_MIN}" = "${FLINK_TM_NET_BUF_MAX}" ]]; then + # fix memory size for network buffers + network_buffers_bytes=${FLINK_TM_NET_BUF_MIN} + else + if [[ "${FLINK_TM_NET_BUF_MIN}" -gt "${FLINK_TM_NET_BUF_MAX}" ]]; then + echo " [ERROR] Configured TaskManager network buffer memory min/max '${FLINK_TM_NET_BUF_MIN}'/'${FLINK_TM_NET_BUF_MAX}' are not valid." + echo "Min must be less than or equal to max." + echo "Please set '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + # Bash only performs integer arithmetic so floating point computation is performed using awk + if [[ -z "${HAVE_AWK}" ]] ; then + command -v awk >/dev/null 2>&1 + if [[ $? -ne 0 ]]; then + echo " [ERROR] Program 'awk' not found." + echo "Please install 'awk' or define '${KEY_TASKM_NET_BUF_MIN}' and '${KEY_TASKM_NET_BUF_MAX}' instead of '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + HAVE_AWK=true + fi + + # We calculate the memory using a fraction of the total memory + if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_NET_BUF_FRACTION}"` != "1" ]]; then + echo " [ERROR] Configured TaskManager network buffer memory fraction '${FLINK_TM_NET_BUF_FRACTION}' is not a valid value." + echo "It must be between 0.0 and 1.0." + echo "Please set '${KEY_TASKM_NET_BUF_FRACTION}' in ${FLINK_CONF_FILE}." + exit 1 + fi + + network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP} ,20) * $ {FLINK_TM_NET_BUF_FRACTION} ; netbuf = x > $ {FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < $ {FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"` + fi + + # recalculate the JVM heap memory by taking the network buffers into account — End diff – This is more of a verification isn't it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112001266

          — Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java —
          @@ -0,0 +1,306 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.dist;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
          +import org.apache.flink.util.OperatingSystem;
          +import org.apache.flink.util.TestLogger;
          +import org.junit.Assume;
          +import org.junit.Before;
          +import org.junit.Test;
          +
          +import java.io.BufferedReader;
          +import java.io.IOException;
          +import java.io.InputStreamReader;
          +import java.util.Random;
          +
          +import static org.hamcrest.CoreMatchers.allOf;
          +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
          +import static org.hamcrest.Matchers.lessThanOrEqualTo;
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertThat;
          +
          +/**
          + * Unit test that verifies that the task manager heap size calculation used by the bash script
          + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
          + *

          {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}

          .
          + *
          + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
          + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
          + * not need high precision.
          + */
          +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
          +
          + /** Key that is used by <tt>config.sh</tt>. */
          + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
          +
          + /**
          + * Number of tests with random values.
          + *
          + * NOTE: calling the external test script is slow and thus low numbers are preferred for general
          + * testing.
          + */
          + private static final int NUM_RANDOM_TESTS = 20;
          +
          + @Before
          + public void checkOperatingSystem() {
          + Assume.assumeTrue("This test checks shell scripts not available on Windows.",
          — End diff –

          missing "that are"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112001266 — Diff: flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java — @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.dist; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.util.OperatingSystem; +import org.apache.flink.util.TestLogger; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Unit test that verifies that the task manager heap size calculation used by the bash script + * <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of + * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} . + * + * NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses + * <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do + * not need high precision. + */ +public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger { + + /** Key that is used by <tt>config.sh</tt>. */ + private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb"; + + /** + * Number of tests with random values. + * + * NOTE: calling the external test script is slow and thus low numbers are preferred for general + * testing. + */ + private static final int NUM_RANDOM_TESTS = 20; + + @Before + public void checkOperatingSystem() { + Assume.assumeTrue("This test checks shell scripts not available on Windows.", — End diff – missing "that are"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r111983754

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java —
          @@ -410,24 +410,47 @@ private static NetworkEnvironment createNetworkEnvironment(
          *

          • @return memory to use for network buffers (in bytes)
            */
            + @SuppressWarnings("deprecation")
            public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) {
            + assert totalJavaMemorySize > 0;
              • End diff –

          use Preconditions instead?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111983754 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java — @@ -410,24 +410,47 @@ private static NetworkEnvironment createNetworkEnvironment( * @return memory to use for network buffers (in bytes) */ + @SuppressWarnings("deprecation") public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) { + assert totalJavaMemorySize > 0; End diff – use Preconditions instead?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r112000380

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java —
          @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
          }
          catch (Throwable t) {
          if (LOG.isErrorEnabled()) {

          • LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
            + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': "
              • End diff –

          Let's see whether i can remember to not squash this commit

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r112000380 — Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java — @@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc } catch (Throwable t) { if (LOG.isErrorEnabled()) { LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': " + LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': " End diff – Let's see whether i can remember to not squash this commit
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r111992933

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java —
          @@ -0,0 +1,272 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.taskexecutor;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.core.memory.MemoryType;
          +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
          +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
          +import org.apache.flink.runtime.util.EnvironmentInformation;
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +import org.powermock.api.mockito.PowerMockito;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.net.InetAddress;
          +import java.util.Random;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +import static org.mockito.Mockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +
          +/**
          + * Unit test for

          {@link TaskManagerServices}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(EnvironmentInformation.class)
          +public class TaskManagerServicesTest {
          +
          + /**
          + * Test for

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
          + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          using new
          + * configurations via

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}

          ,
          + *

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}

          and
          + *

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}

          .
          + */
          + @Test
          + public void calculateNetworkBufNew() throws Exception

          { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + }

          +
          + /**
          + * Helper to test

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          with the
          + * new configuration parameters.
          + *
          + * @param config configuration object
          + */
          + private static void calculateNetworkBufNew(final Configuration config) {
          + // (2) fixed size memory
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
          +
          + // note: actual network buffer memory size is independent of the totalJavaMemorySize
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
          +
          + // (3) random fraction, min, and max values
          + Random ran = new Random();
          + for (int i = 0; i < 1_000; ++i){
          + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
          + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
          +
          + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
          +
          + long max = Math.max(min, ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
          +
          + long javaMem = Math.max(max + 1, ran.nextLong());
          +
          + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
          — End diff –

          What we definitely need here is a catch block that prints the used parameters if any assertion fails. It is a bit odd to use random parameters in the first place though :/

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111992933 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java — @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new + * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION} , + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX} . + */ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } + + /** + * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the + * new configuration parameters. + * + * @param config configuration object + */ + private static void calculateNetworkBufNew(final Configuration config) { + // (2) fixed size memory + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config)); + + // (3) random fraction, min, and max values + Random ran = new Random(); + for (int i = 0; i < 1_000; ++i){ + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac); + + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min); + + long max = Math.max(min, ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max); + + long javaMem = Math.max(max + 1, ran.nextLong()); + + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config); — End diff – What we definitely need here is a catch block that prints the used parameters if any assertion fails. It is a bit odd to use random parameters in the first place though :/
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r111997260

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java —
          @@ -0,0 +1,272 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.taskexecutor;
          +
          +import org.apache.flink.configuration.Configuration;
          +import org.apache.flink.configuration.TaskManagerOptions;
          +import org.apache.flink.core.memory.MemoryType;
          +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
          +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
          +import org.apache.flink.runtime.util.EnvironmentInformation;
          +import org.junit.Test;
          +import org.junit.runner.RunWith;
          +import org.powermock.api.mockito.PowerMockito;
          +import org.powermock.core.classloader.annotations.PrepareForTest;
          +import org.powermock.modules.junit4.PowerMockRunner;
          +
          +import java.net.InetAddress;
          +import java.util.Random;
          +
          +import static org.junit.Assert.assertEquals;
          +import static org.junit.Assert.assertTrue;
          +import static org.mockito.Mockito.mock;
          +import static org.powermock.api.mockito.PowerMockito.when;
          +
          +/**
          + * Unit test for

          {@link TaskManagerServices}

          .
          + */
          +@RunWith(PowerMockRunner.class)
          +@PrepareForTest(EnvironmentInformation.class)
          +public class TaskManagerServicesTest {
          +
          + /**
          + * Test for

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old
          + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          using new
          + * configurations via

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}

          ,
          + *

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}

          and
          + *

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}

          .
          + */
          + @Test
          + public void calculateNetworkBufNew() throws Exception

          { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + }

          +
          + /**
          + * Helper to test

          {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the
          + * new configuration parameters.
          + *
          + * @param config configuration object
          + */
          + private static void calculateNetworkBufNew(final Configuration config) {
          + // (2) fixed size memory
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
          +
          + // note: actual network buffer memory size is independent of the totalJavaMemorySize
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config));
          + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config));
          +
          + // (3) random fraction, min, and max values
          + Random ran = new Random();
          + for (int i = 0; i < 1_000; ++i){
          + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
          + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
          +
          + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
          +
          + long max = Math.max(min, ran.nextLong());
          + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
          +
          + long javaMem = Math.max(max + 1, ran.nextLong());
          +
          + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
          + assertTrue(networkBufMem >= min);
          + assertTrue(networkBufMem <= max);
          + if (networkBufMem > min && networkBufMem < max) { + assertEquals((long) (javaMem * frac), networkBufMem); + }
          + }
          + }
          +
          + /**
          + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)}

          using mixed
          + * old/new configurations.
          + */
          + @SuppressWarnings("deprecation")
          + @Test
          + public void calculateNetworkBufMixed() throws Exception {
          + Configuration config = new Configuration();
          + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
          +
          + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
          + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
          + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
          +
          + // old + 1 new parameter = new:
          + Configuration config1 = config.clone();
          + config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
          + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))),
          — End diff –

          This seems overly complicated. If we would use a set of well-defined values instead of the defaults we could get rid of the min/max calls, no? This applies to the whole test.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111997260 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java — @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.net.InetAddress; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Unit test for {@link TaskManagerServices} . + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(EnvironmentInformation.class) +public class TaskManagerServicesTest { + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using old + * configurations via {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufOld() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(), + TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + + // test integer overflow in the memory size + int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33 + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers); + assertEquals(2L << 32, TaskManagerServices.calculateNetworkBuf(2L << 33, config)); + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using new + * configurations via {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION} , + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} and + * {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX} . + */ + @Test + public void calculateNetworkBufNew() throws Exception { + Configuration config = new Configuration(); + + // (1) defaults + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 20)))), + TaskManagerServices.calculateNetworkBuf((64L << 20 + 1), config)); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (defaultFrac * (10L << 30)))), + TaskManagerServices.calculateNetworkBuf((10L << 30), config)); + + calculateNetworkBufNew(config); + } + + /** + * Helper to test {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} with the + * new configuration parameters. + * + * @param config configuration object + */ + private static void calculateNetworkBufNew(final Configuration config) { + // (2) fixed size memory + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB + + // note: actual network buffer memory size is independent of the totalJavaMemorySize + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(10L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(64L << 20, config)); + assertEquals(1 << 20, TaskManagerServices.calculateNetworkBuf(1L << 30, config)); + + // (3) random fraction, min, and max values + Random ran = new Random(); + for (int i = 0; i < 1_000; ++i){ + float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE); + config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac); + + long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min); + + long max = Math.max(min, ran.nextLong()); + config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max); + + long javaMem = Math.max(max + 1, ran.nextLong()); + + final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config); + assertTrue(networkBufMem >= min); + assertTrue(networkBufMem <= max); + if (networkBufMem > min && networkBufMem < max) { + assertEquals((long) (javaMem * frac), networkBufMem); + } + } + } + + /** + * Test for {@link TaskManagerServices#calculateNetworkBuf(long, Configuration)} using mixed + * old/new configurations. + */ + @SuppressWarnings("deprecation") + @Test + public void calculateNetworkBufMixed() throws Exception { + Configuration config = new Configuration(); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1); + + final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue(); + final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue(); + final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue(); + + // old + 1 new parameter = new: + Configuration config1 = config.clone(); + config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f); + assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))), — End diff – This seems overly complicated. If we would use a set of well-defined values instead of the defaults we could get rid of the min/max calls, no? This applies to the whole test.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r111994207

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java —
          @@ -376,6 +392,169 @@ private static NetworkEnvironment createNetworkEnvironment(
          }

          /**
          + * Calculates the amount of memory used for network buffers based on the total memory to use and
          + * the according configuration parameters.
          + *
          + * The following configuration parameters are involved:
          + * <ul>
          + * <li>

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}

          ,</li>
          + * <li>

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}

          ,</li>
          + * <li>

          {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}

          , and</li>
          + * <li>

          {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}

          (fallback if the ones above do not exist)</li>
          + * </ul>.
          + *
          + * @param totalJavaMemorySize
          + * overall available memory to use (heap and off-heap, in bytes)
          + * @param config
          + * configuration object
          + *
          + * @return memory to use for network buffers (in bytes)
          + */
          + public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) {
          — End diff –

          how about a slightly longer method name?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111994207 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java — @@ -376,6 +392,169 @@ private static NetworkEnvironment createNetworkEnvironment( } /** + * Calculates the amount of memory used for network buffers based on the total memory to use and + * the according configuration parameters. + * + * The following configuration parameters are involved: + * <ul> + * <li> {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION} ,</li> + * <li> {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN} ,</li> + * <li> {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX} , and</li> + * <li> {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist)</li> + * </ul>. + * + * @param totalJavaMemorySize + * overall available memory to use (heap and off-heap, in bytes) + * @param config + * configuration object + * + * @return memory to use for network buffers (in bytes) + */ + public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) { — End diff – how about a slightly longer method name?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r111969982

          — Diff: docs/setup/config.md —
          @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated

            1. Background

          +

              1. Configuring the Network Buffers

          -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers:
          +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you
          — End diff –

          afaik those things only happen if you leave an empty space at the end of the line or so - here, the lines are properly joined inside HTML

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111969982 — Diff: docs/setup/config.md — @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated Background + Configuring the Network Buffers -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers: +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you — End diff – afaik those things only happen if you leave an empty space at the end of the line or so - here, the lines are properly joined inside HTML
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3721#discussion_r111966911

          — Diff: docs/setup/config.md —
          @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated

            1. Background

          +

              1. Configuring the Network Buffers

          -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers:
          +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you
          — End diff –

          We usually don't do manual line breaks in the documentation; otherwise if you resize the window funky things start to happen.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3721#discussion_r111966911 — Diff: docs/setup/config.md — @@ -602,26 +612,66 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated Background + Configuring the Network Buffers -If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers: +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you — End diff – We usually don't do manual line breaks in the documentation; otherwise if you resize the window funky things start to happen.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3721

          I've merged the 2 PR's that this one build upon; could you rebase this one?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3721 I've merged the 2 PR's that this one build upon; could you rebase this one?
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user NicoK opened a pull request:

          https://github.com/apache/flink/pull/3721

          FLINK-4545 replace the network buffers parameter

          (based on #3708 and #3713)

          Instead, allow the configuration with the following three new (more flexible) parameters:

          • `taskmanager.network.memory.fraction`: fraction of JVM memory to use for network buffers (default: 0.1)
          • `taskmanager.network.memory.min`: minimum memory size for network buffers (default: 64 MB)
          • `taskmanager.network.memory.max`: maximum memory size for network buffers (default: 1 GB)

          Note that I needed to adapt two unit tests which would have been killed on Travis CI because these defaults result in ~150MB memory being used for network buffers which apparently was too much there.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/NicoK/flink flink-4545

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3721.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3721


          commit e61f7bc4debce332c421cb645ff1025b4d03d8d0
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-11T09:26:29Z

          FLINK-6292 fix transfer.sh upload by using https

          Seems the upload via http is not supported anymore.

          commit 362ceec0823b179719449d0ed244c591dfcf51f4
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-12T09:09:03Z

          FLINK-6299 make all IT cases extend from TestLogger

          This way, currently executed tests and their failures are properly logged.

          commit 973099ef55701fe63951639d37b4f01765b06a01
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-06T12:41:52Z

          FLINK-4545 replace the network buffers parameter

          Instead, allow the configuration with the following three new (more flexible)
          parameters:

          • "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1)
          • "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB)
          • "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB)
          1. Please enter the commit message for your changes. Lines starting

          commit 09a981189b59ac13bd39000cc77913c0b03289fd
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-11T12:20:40Z

          [hotfix] fix typo in error message

          commit 0960a809c8da51b9787f3f726945716933051fc3
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-11T13:29:41Z

          [hotfix] fix typo in taskmanager.sh usage string

          commit 298bb69451a1405df774451de11eb5684534c956
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-06T15:58:14Z

          FLINK-4545 adapt taskmanager.sh to take network buffers memory into account

          commit ea2fb24f4a6eb18cc3f8d3ebd83a49c0f1386a8a
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-10T09:43:50Z

          FLINK-4545 add configuration checks for the new network buffer memory config

          commit 5133d250c4dba4a5e72baad95c841d2b03cb49ea
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-10T16:22:10Z

          FLINK-4545 add unit tests using the new network configuration parameters and methods

          commit a24a548e6ff7e36581f7f7457099656362ca3974
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-11T16:52:56Z

          FLINK-4545 add unit tests for heap size calculation in shell scripts

          These verify that the results are the same as in the calculation done by Java.

          commit d55153d559bf110a931b5de849df812038ba4a7a
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-12T16:11:37Z

          FLINK-4545 update the docs with the changed network buffer parameter

          Also update the descriptions of taskmanager.memory.fraction not being relative
          to the full size of taskmanager.heap.mb but that network buffer memory is
          subtracted before!

          commit c48beb0d67e8ef847ef845835e342d4a49127e7d
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-04-12T16:25:27Z

          FLINK-4545 fix some tests being killed on Travis CI

          Due to the increased defaults for network buffer memory use, some builds on
          Travis CI fail with unit tests being killed. This affects

          • RocksDbBackendEventTimeWindowCheckpointingITCase and
          • HBaseConnectorITCase

          We fix this by limiting the maximum amount of network buffer memory to 80MB
          (current defaults would yield 150MB, previously 64MB were used).


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3721 FLINK-4545 replace the network buffers parameter (based on #3708 and #3713) Instead, allow the configuration with the following three new (more flexible) parameters: `taskmanager.network.memory.fraction`: fraction of JVM memory to use for network buffers (default: 0.1) `taskmanager.network.memory.min`: minimum memory size for network buffers (default: 64 MB) `taskmanager.network.memory.max`: maximum memory size for network buffers (default: 1 GB) Note that I needed to adapt two unit tests which would have been killed on Travis CI because these defaults result in ~150MB memory being used for network buffers which apparently was too much there. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4545 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3721.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3721 commit e61f7bc4debce332c421cb645ff1025b4d03d8d0 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-11T09:26:29Z FLINK-6292 fix transfer.sh upload by using https Seems the upload via http is not supported anymore. commit 362ceec0823b179719449d0ed244c591dfcf51f4 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-12T09:09:03Z FLINK-6299 make all IT cases extend from TestLogger This way, currently executed tests and their failures are properly logged. commit 973099ef55701fe63951639d37b4f01765b06a01 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-06T12:41:52Z FLINK-4545 replace the network buffers parameter Instead, allow the configuration with the following three new (more flexible) parameters: "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1) "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB) "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB) Please enter the commit message for your changes. Lines starting commit 09a981189b59ac13bd39000cc77913c0b03289fd Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-11T12:20:40Z [hotfix] fix typo in error message commit 0960a809c8da51b9787f3f726945716933051fc3 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-11T13:29:41Z [hotfix] fix typo in taskmanager.sh usage string commit 298bb69451a1405df774451de11eb5684534c956 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-06T15:58:14Z FLINK-4545 adapt taskmanager.sh to take network buffers memory into account commit ea2fb24f4a6eb18cc3f8d3ebd83a49c0f1386a8a Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-10T09:43:50Z FLINK-4545 add configuration checks for the new network buffer memory config commit 5133d250c4dba4a5e72baad95c841d2b03cb49ea Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-10T16:22:10Z FLINK-4545 add unit tests using the new network configuration parameters and methods commit a24a548e6ff7e36581f7f7457099656362ca3974 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-11T16:52:56Z FLINK-4545 add unit tests for heap size calculation in shell scripts These verify that the results are the same as in the calculation done by Java. commit d55153d559bf110a931b5de849df812038ba4a7a Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-12T16:11:37Z FLINK-4545 update the docs with the changed network buffer parameter Also update the descriptions of taskmanager.memory.fraction not being relative to the full size of taskmanager.heap.mb but that network buffer memory is subtracted before! commit c48beb0d67e8ef847ef845835e342d4a49127e7d Author: Nico Kruber <nico@data-artisans.com> Date: 2017-04-12T16:25:27Z FLINK-4545 fix some tests being killed on Travis CI Due to the increased defaults for network buffer memory use, some builds on Travis CI fail with unit tests being killed. This affects RocksDbBackendEventTimeWindowCheckpointingITCase and HBaseConnectorITCase We fix this by limiting the maximum amount of network buffer memory to 80MB (current defaults would yield 150MB, previously 64MB were used).
          Hide
          zjwang zhijiang added a comment -

          Stephan Ewen, Yeah, there are two cases, first is for all task managers with the same certain size like mini cluster mode, and the above three parameters "fraction, min, max" Nico mentioned can both work for network memory.

          Second is for computing the size of container like in yarn mode, and we actually need a configuration parameter for network memory, maybe the parameter `taskmanager.network.memory.max` Nico mentioned above can still work for that. My understanding is right?

          Show
          zjwang zhijiang added a comment - Stephan Ewen , Yeah, there are two cases, first is for all task managers with the same certain size like mini cluster mode, and the above three parameters "fraction, min, max" Nico mentioned can both work for network memory. Second is for computing the size of container like in yarn mode, and we actually need a configuration parameter for network memory, maybe the parameter `taskmanager.network.memory.max` Nico mentioned above can still work for that. My understanding is right?
          Hide
          StephanEwen Stephan Ewen added a comment -

          zhijiang The logic outlined above is tailored towards the case where you have a container or JVM of a certain size and want to configure how much memory goes to what component.

          In the case where you actually want to compute the size of the container (as in the fine-grained resource configuration code), we probably need a configuration parameter for the network memory to add to each container. What do you think?

          Show
          StephanEwen Stephan Ewen added a comment - zhijiang The logic outlined above is tailored towards the case where you have a container or JVM of a certain size and want to configure how much memory goes to what component. In the case where you actually want to compute the size of the container (as in the fine-grained resource configuration code), we probably need a configuration parameter for the network memory to add to each container. What do you think?
          Hide
          zjwang zhijiang added a comment -

          Stephan Ewen, thank you for so rich information!
          I agree with the above points and know well the considerations and future plans!
          The current constant global buffer amount is replaced by the flexible minimum and maximum range, which is especially helpful for current batch jobs and future better recovery for streaming. Wish to see the changes quickly and participate in related parts!

          Show
          zjwang zhijiang added a comment - Stephan Ewen , thank you for so rich information! I agree with the above points and know well the considerations and future plans! The current constant global buffer amount is replaced by the flexible minimum and maximum range, which is especially helpful for current batch jobs and future better recovery for streaming. Wish to see the changes quickly and participate in related parts!
          Hide
          StephanEwen Stephan Ewen added a comment -

          There are a few reasons why I think the GlobalBufferPool would be helpful:

          • It is hard to know up front exactly how many network buffers will be needed. A TaskManager may get different tasks after a recovery (with more channels). In batch jobs, tasks are deployed lazily and a TaskManager may get more tasks over time.
          • Having more network memory is for batch jobs generally quite beneficial. But how many more? Usually as many as the system can spare.
          • Also on the streaming side, we plan to make use of spare buffers to help with better recovery in the future. The change now would make sure that this is possible without changing the memory configuration again.
          • Finally, on demand-buffer-allocation can help, but has also a downside: A system that reserves/allocates memory up from is more predictable later.
          Show
          StephanEwen Stephan Ewen added a comment - There are a few reasons why I think the GlobalBufferPool would be helpful: It is hard to know up front exactly how many network buffers will be needed. A TaskManager may get different tasks after a recovery (with more channels). In batch jobs, tasks are deployed lazily and a TaskManager may get more tasks over time. Having more network memory is for batch jobs generally quite beneficial. But how many more? Usually as many as the system can spare. Also on the streaming side, we plan to make use of spare buffers to help with better recovery in the future. The change now would make sure that this is possible without changing the memory configuration again. Finally, on demand-buffer-allocation can help, but has also a downside: A system that reserves/allocates memory up from is more predictable later.
          Hide
          zjwang zhijiang added a comment - - edited

          Nico Kruber, Stephan Ewen, thank you for your replies!

          As I know, we already introduced another two parameters for users to control the core buffer amount in LocalBufferPool, taskmanager.net.memory.buffers-per-channel(default: 2) and taskmanager.net.memory.extra-buffers-per-gate(default: 8) separately. And it is easy for users to understand them.

          In my opinion , the JobManager or ResourceManager can decide the total buffer amount in NetworkBufferPool based on two conditions:

          • The number of tasks would be deployed into this TaskManager.
          • The core number of buffers in LocalBufferPool, maybe extra consider ResultPartitionType.

          So the framework can calculate the total buffer amount and corresponding memory usage before starting the TaskManager. The buffer amount will be set onto TaskManager and memory usage will be aggregated into the total resource request. It seems consistent.

          Have you considered not to expose the global network memory parameters to users, and just allow the parameters in LocalBufferPool.Otherwise the users should consider both global parameters and local parameters, and may also understand the total number of tasks in TaskManager. It is easy to cause conflict. The less parameters users know, the better.

          Maybe you have other concerns that I have not covered yet, wish your advices and I am very willing to do something for it if needed.

          Show
          zjwang zhijiang added a comment - - edited Nico Kruber , Stephan Ewen , thank you for your replies! As I know, we already introduced another two parameters for users to control the core buffer amount in LocalBufferPool , taskmanager.net.memory.buffers-per-channel (default: 2) and taskmanager.net.memory.extra-buffers-per-gate (default: 8) separately. And it is easy for users to understand them. In my opinion , the JobManager or ResourceManager can decide the total buffer amount in NetworkBufferPool based on two conditions: The number of tasks would be deployed into this TaskManager . The core number of buffers in LocalBufferPool , maybe extra consider ResultPartitionType . So the framework can calculate the total buffer amount and corresponding memory usage before starting the TaskManager . The buffer amount will be set onto TaskManager and memory usage will be aggregated into the total resource request. It seems consistent. Have you considered not to expose the global network memory parameters to users, and just allow the parameters in LocalBufferPool .Otherwise the users should consider both global parameters and local parameters, and may also understand the total number of tasks in TaskManager . It is easy to cause conflict. The less parameters users know, the better. Maybe you have other concerns that I have not covered yet, wish your advices and I am very willing to do something for it if needed.
          Hide
          NicoK Nico Kruber added a comment -

          We did some thinking and would probably add the following three new configuration parameters (with the given defaults) to finally replace the taskmanager.network.numberOfBuffers parameter:

          • taskmanager.network.memory.fraction (default: 0.1): fraction of JVM memory to use for network buffers (by reducing taskmanager.memory.fraction from 0.7 to 0.6)
          • taskmanager.network.memory.min (default: 64MB): minimum memory size for network buffers
          • taskmanager.network.memory.max (default: 1GB): maximum memory size for network buffers

          A fixed size may be achieved by setting the latter two to the same value, taskmanager.network.numberOfBuffers will be marked deprecated and used only if the other three are not given, e.g. due to old config files being used.

          Show
          NicoK Nico Kruber added a comment - We did some thinking and would probably add the following three new configuration parameters (with the given defaults) to finally replace the taskmanager.network.numberOfBuffers parameter: taskmanager.network.memory.fraction (default: 0.1): fraction of JVM memory to use for network buffers (by reducing taskmanager.memory.fraction from 0.7 to 0.6) taskmanager.network.memory.min (default: 64MB): minimum memory size for network buffers taskmanager.network.memory.max (default: 1GB): maximum memory size for network buffers A fixed size may be achieved by setting the latter two to the same value, taskmanager.network.numberOfBuffers will be marked deprecated and used only if the other three are not given, e.g. due to old config files being used.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3467

          @zhijiangW Yes, let's discuss this when the feature is complete.
          Our thinking so far is:

          • One can specify an absolute amount of network memory (similar as one can specify an absolute amount of managed memory for batch)
          • If no absolute amount is specified, a relative fraction of the JVM heap will be pre-allocated as network buffers.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3467 @zhijiangW Yes, let's discuss this when the feature is complete. Our thinking so far is: One can specify an absolute amount of network memory (similar as one can specify an absolute amount of managed memory for batch) If no absolute amount is specified, a relative fraction of the JVM heap will be pre-allocated as network buffers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3467

          @NicoK ,thank you for explanation, and I already trace the code in your local branch. Wish your further change commit in global pool.

          @StephanEwen , thanks for further elaboration. From my understanding, each task can decide the core number of buffers in `LocalBufferPool` based on input, output channels and configuration, the maximum number of buffers based on `ResultPartitionType`. And all the `LocalBufferPool`s make effect on the total number of buffers in `NetworkBufferPool`, may need consider maximum memory usages.

          And my concern is to consider the memory usages in `NetworkBufferPool` before starts the `TaskManager`, and this part of memory should be added into the total resource of `TaskManager`.
          I am willing to do that as a part of my current work in [Fine-grained Resource Configuration](https://issues.apache.org/jira/browse/FLINK-5131) after this feature completes.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3467 @NicoK ,thank you for explanation, and I already trace the code in your local branch. Wish your further change commit in global pool. @StephanEwen , thanks for further elaboration. From my understanding, each task can decide the core number of buffers in `LocalBufferPool` based on input, output channels and configuration, the maximum number of buffers based on `ResultPartitionType`. And all the `LocalBufferPool`s make effect on the total number of buffers in `NetworkBufferPool`, may need consider maximum memory usages. And my concern is to consider the memory usages in `NetworkBufferPool` before starts the `TaskManager`, and this part of memory should be added into the total resource of `TaskManager`. I am willing to do that as a part of my current work in [Fine-grained Resource Configuration] ( https://issues.apache.org/jira/browse/FLINK-5131 ) after this feature completes.
          Hide
          StephanEwen Stephan Ewen added a comment - - edited

          Greg Hogan Sorry, just saw I never answered to your comment.
          The line of thoughts for the fix is as follows:
          The management of buffers involves (1) making sure that you have enough and (2) making sure you don't have too much where too much hurts.

          (1) Is not too hard, we can either allocate buffers in demand as needed or simply allocate more by default (clever heuristic depending on JVM size). The later may still require an increase in network buffers in few cases.

          (2) Is different in batch and streaming workloads:

          • Batch always benefits from more network memory, as this causes better smoothing over short term back pressure.
          • The exception in batch is when you eat so much memory that the disk caches suffer, as you mentioned. We are not taking this into account currently, as we assume there is a total "Flink budget" of memory (frequently simply the JVM heap) and we only work in distributing memory within that budget.
          • Streaming must not have too much data in flight, as in flight data needs to be aligned during checkpoints.
          • More memory may still help to keep data around after sending, for replays (relevant later)

          The fix will introduce a new type of data exchange PIPELINED_BOUNDED which will be used by streaming and make sure not too much data can be in flight in a single input gate or output gate.

          There will probably be more refinements over time, but this seems a simple way to address most of the problem. Since many users are affected by this, we want to get the simple fix in first, and then improve on that.

          Show
          StephanEwen Stephan Ewen added a comment - - edited Greg Hogan Sorry, just saw I never answered to your comment. The line of thoughts for the fix is as follows: The management of buffers involves (1) making sure that you have enough and (2) making sure you don't have too much where too much hurts. (1) Is not too hard, we can either allocate buffers in demand as needed or simply allocate more by default (clever heuristic depending on JVM size). The later may still require an increase in network buffers in few cases. (2) Is different in batch and streaming workloads: Batch always benefits from more network memory, as this causes better smoothing over short term back pressure. The exception in batch is when you eat so much memory that the disk caches suffer, as you mentioned. We are not taking this into account currently, as we assume there is a total "Flink budget" of memory (frequently simply the JVM heap) and we only work in distributing memory within that budget. Streaming must not have too much data in flight, as in flight data needs to be aligned during checkpoints. More memory may still help to keep data around after sending, for replays (relevant later) The fix will introduce a new type of data exchange PIPELINED_BOUNDED which will be used by streaming and make sure not too much data can be in flight in a single input gate or output gate. There will probably be more refinements over time, but this seems a simple way to address most of the problem. Since many users are affected by this, we want to get the simple fix in first, and then improve on that.
          Hide
          StephanEwen Stephan Ewen added a comment -

          Part two fixed in 11e2aa6dcdbf42992cda57a5b50d5c29b4facf2d

          Show
          StephanEwen Stephan Ewen added a comment - Part two fixed in 11e2aa6dcdbf42992cda57a5b50d5c29b4facf2d
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3480

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3480
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3480

          Did another review - looks good to me!
          Merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3480 Did another review - looks good to me! Merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3480

          added the requested changes and successfully rebased on the newest master due to conflicts

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3480 added the requested changes and successfully rebased on the newest master due to conflicts
          Hide
          StephanEwen Stephan Ewen added a comment -

          Part one merged in 8b49ee5aa2e17b1787764c3265e1ebda47d89840

          Show
          StephanEwen Stephan Ewen added a comment - Part one merged in 8b49ee5aa2e17b1787764c3265e1ebda47d89840
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3467

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3467
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3467#discussion_r105143643

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java —
          @@ -265,11 +281,15 @@ public String toString() {
          // ------------------------------------------------------------------------

          private void returnMemorySegment(MemorySegment segment) {
          + assert Thread.holdsLock(availableMemorySegments);
          — End diff –

          Using synchronized again would impact performance, while assertions only do when they are enabled which is the case in our unit tests (see https://maven.apache.org/surefire/maven-surefire-plugin/test-mojo.html#enableAssertions).

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/3467#discussion_r105143643 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java — @@ -265,11 +281,15 @@ public String toString() { // ------------------------------------------------------------------------ private void returnMemorySegment(MemorySegment segment) { + assert Thread.holdsLock(availableMemorySegments); — End diff – Using synchronized again would impact performance, while assertions only do when they are enabled which is the case in our unit tests (see https://maven.apache.org/surefire/maven-surefire-plugin/test-mojo.html#enableAssertions ).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3480

          Code looks overall very nice!

          What would be good is to make the "magic constants" configurable. I don't expect that anyone usually tweaks those (and eventually they will disappear and flow control will auto-tune the network), but having them configurable is always good to have a workaround in case something unexpected happens.

          I would follow the same path as the `partitionRequestInitialBackoff` and `partitionRequestMaxBackoff` values in the `NetworkEnvironment`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3480 Code looks overall very nice! What would be good is to make the "magic constants" configurable. I don't expect that anyone usually tweaks those (and eventually they will disappear and flow control will auto-tune the network), but having them configurable is always good to have a workaround in case something unexpected happens. I would follow the same path as the `partitionRequestInitialBackoff` and `partitionRequestMaxBackoff` values in the `NetworkEnvironment`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wenlong88 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3467#discussion_r105081191

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java —
          @@ -265,11 +281,15 @@ public String toString() {
          // ------------------------------------------------------------------------

          private void returnMemorySegment(MemorySegment segment) {
          + assert Thread.holdsLock(availableMemorySegments);
          — End diff –

          Hi, I have a question about assert, because I found that assertion is disabled in java by default. why not use explicit `synchronized(availableMemorySegments)` which may be more common usage.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wenlong88 commented on a diff in the pull request: https://github.com/apache/flink/pull/3467#discussion_r105081191 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java — @@ -265,11 +281,15 @@ public String toString() { // ------------------------------------------------------------------------ private void returnMemorySegment(MemorySegment segment) { + assert Thread.holdsLock(availableMemorySegments); — End diff – Hi, I have a question about assert, because I found that assertion is disabled in java by default. why not use explicit `synchronized(availableMemorySegments)` which may be more common usage.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3467

          I think this is a good change, merging this...

          @zhijiangW Managing the buffers changes in some followup PR, first adjusting the local pools, then the global pool. Managing buffers in a global pool can help when caching data, such as for batch jobs. But we can take suggestions followup improvements as a separate thread, after this improvement is in.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3467 I think this is a good change, merging this... @zhijiangW Managing the buffers changes in some followup PR, first adjusting the local pools, then the global pool. Managing buffers in a global pool can help when caching data, such as for batch jobs. But we can take suggestions followup improvements as a separate thread, after this improvement is in.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user NicoK opened a pull request:

          https://github.com/apache/flink/pull/3480

          FLINK-4545 use size-restricted LocalBufferPool instances for network communication

          Note: this PR is based on #3467 and PR 2 of 3 in a series to get rid of the network buffer parameter.

          With this PR, the number of buffers a `LocalBufferPool` has to offer, will be limited to `2 * <number of channels> + 8` for both input and output connections. This way, we reduce buffer bloat in our network stack without limiting ourselves to specific jobs and their connections too much since the total number of network buffers can now be arbitrarily large again without consequences on the delays checkpoint barriers, for example, have while travelling through all TMs.

          Eventually, this will lead to the network buffer parameter being removed (which was the initial goal) but in a simple scenario like the following, with a parallelism of 2 and thus running on 6 TMs, we were able to reduce the 75-percentile of checkpoint delays by 60% from 38ms to 16ms (median at 7 for both).

          ```java
          final StreamExecutionEnvironment env = getStreamExecutionEnvironment(params);
          env.disableOperatorChaining();

          env.enableCheckpointing(1_000L);

          DataStreamSource<Tuple2<Long, Long>> source1 = env.addSource(new LongSource());

          source1.slotSharingGroup("source")
          .keyBy(1)
          .map(new IdentityMapFunction<Tuple2<Long, Long>>())
          .slotSharingGroup("map")
          .keyBy(1)
          .addSink(new DiscardingSink<Tuple2<Long, Long>>())
          .slotSharingGroup("sink");
          ```

          By adding random delays (every 1000 keys 0-1ms) to the `IdentityMapFunction`, the median even improves from 5026ms to 293ms.

          Both scenarios do not influence the throughput of the program but for real programs, reductions in delay may differ since there actual state may need to be stored and other components take part as well

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/NicoK/flink flink-4545

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3480.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3480


          commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-10T13:36:37Z

          FLINK-4545 remove (unused) persistent partition type

          commit 11557c004450bcbbe680f1575f0e41d164424eae
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-10T15:11:08Z

          [docs] improve some documentation around network buffers

          commit cd999061d04ae803c79473241ac1f9b39c1f2731
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-10T15:12:19Z

          [hotfix][network] add some assertions documenting on which locks we rely

          commit 8f529bb3f42916c816c5091228569952917ad9b5
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-03-01T13:33:44Z

          FLINK-4545 remove fixed-size BufferPool instances

          These were unused except for unit tests and will be replaced with bounded
          BufferPool instances.

          commit 91cea2917e9453f9de5c02472d99d4fc0d090dda
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-03-06T11:36:02Z

          FLINK-4545 remove JobVertex#connectNewDataSetAsInput variant without partition type

          This removes
          JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern)
          and requires the developer to call
          JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType)
          instead and think about the partition type to add.

          commit 83d1404b106b558679e4c9ef16123fbc6b5eac72
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-03-06T11:37:56Z

          FLINK-4545 remove unused IntermediateDataSet constructors

          These were implying a default result partition type which we want the developer
          to actively decide upon.

          commit e9d41b6b613a7bac5c489102977e16e4c6c4bb86
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-10T13:53:09Z

          FLINK-4545 add a bounded result partition type

          This can be used to limit the number of network buffers used for this partition.

          (borrows the appropriate parts of a commit previously sketched for
          FLINK-5088 to implement bounded network queue lengths)

          commit b57f0652a768645a5712d376d0e4b438f35cfa6c
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-10T17:22:55Z

          FLINK-4545 allow LocalBufferPool to use a limited number of buffers

          commit d1d8b18bba967c6fb8f3934aa4cf1cfc8a2c1106
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-20T16:12:54Z

          FLINK-4545 also make the ResultPartitionType available at the InputGate

          This way, we know what kind of result partition is consumed by the input gate.

          commit d23fdf9d80dea5d46bfe2f7597f4d5e1295cae7b
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-13T18:24:45Z

          FLINK-4545 try to set an upper bound on the LocalBufferPool if restricted

          Use "2 * <number of channels> + 8" from the following considerations:

          • 1 buffer for in-flight data in the subpartition/input channel
          • 1 buffer for parallel serialization
          • + some extra buffers

          Also re-introduce some tests for bounded buffer pools similar to the fixed-size
          buffer pool tests before.

          commit 37eed7bc59b6899e3d7bdd4b1a3dac87e5f04406
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-24T12:41:11Z

          FLINK-4545 re-implement NetworkBufferPool#redistributeBuffers

          This version also takes the bounded network buffers into account.
          The distribution is not strictly uniform anymore though:

          • for every buffer pool, we determine the maximum number of buffers it can take
            from the available number - let's call this its 'capacity'
          • then, each of them will get roughly available * capacity / totalCapacity
            buffers on top of the required number of buffers

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3480 FLINK-4545 use size-restricted LocalBufferPool instances for network communication Note: this PR is based on #3467 and PR 2 of 3 in a series to get rid of the network buffer parameter. With this PR, the number of buffers a `LocalBufferPool` has to offer, will be limited to `2 * <number of channels> + 8` for both input and output connections. This way, we reduce buffer bloat in our network stack without limiting ourselves to specific jobs and their connections too much since the total number of network buffers can now be arbitrarily large again without consequences on the delays checkpoint barriers, for example, have while travelling through all TMs. Eventually, this will lead to the network buffer parameter being removed (which was the initial goal) but in a simple scenario like the following, with a parallelism of 2 and thus running on 6 TMs, we were able to reduce the 75-percentile of checkpoint delays by 60% from 38ms to 16ms (median at 7 for both). ```java final StreamExecutionEnvironment env = getStreamExecutionEnvironment(params); env.disableOperatorChaining(); env.enableCheckpointing(1_000L); DataStreamSource<Tuple2<Long, Long>> source1 = env.addSource(new LongSource()); source1.slotSharingGroup("source") .keyBy(1) .map(new IdentityMapFunction<Tuple2<Long, Long>>()) .slotSharingGroup("map") .keyBy(1) .addSink(new DiscardingSink<Tuple2<Long, Long>>()) .slotSharingGroup("sink"); ``` By adding random delays (every 1000 keys 0-1ms) to the `IdentityMapFunction`, the median even improves from 5026ms to 293ms. Both scenarios do not influence the throughput of the program but for real programs, reductions in delay may differ since there actual state may need to be stored and other components take part as well You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4545 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3480 commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-10T13:36:37Z FLINK-4545 remove (unused) persistent partition type commit 11557c004450bcbbe680f1575f0e41d164424eae Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-10T15:11:08Z [docs] improve some documentation around network buffers commit cd999061d04ae803c79473241ac1f9b39c1f2731 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-10T15:12:19Z [hotfix] [network] add some assertions documenting on which locks we rely commit 8f529bb3f42916c816c5091228569952917ad9b5 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-03-01T13:33:44Z FLINK-4545 remove fixed-size BufferPool instances These were unused except for unit tests and will be replaced with bounded BufferPool instances. commit 91cea2917e9453f9de5c02472d99d4fc0d090dda Author: Nico Kruber <nico@data-artisans.com> Date: 2017-03-06T11:36:02Z FLINK-4545 remove JobVertex#connectNewDataSetAsInput variant without partition type This removes JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern) and requires the developer to call JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) instead and think about the partition type to add. commit 83d1404b106b558679e4c9ef16123fbc6b5eac72 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-03-06T11:37:56Z FLINK-4545 remove unused IntermediateDataSet constructors These were implying a default result partition type which we want the developer to actively decide upon. commit e9d41b6b613a7bac5c489102977e16e4c6c4bb86 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-10T13:53:09Z FLINK-4545 add a bounded result partition type This can be used to limit the number of network buffers used for this partition. (borrows the appropriate parts of a commit previously sketched for FLINK-5088 to implement bounded network queue lengths) commit b57f0652a768645a5712d376d0e4b438f35cfa6c Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-10T17:22:55Z FLINK-4545 allow LocalBufferPool to use a limited number of buffers commit d1d8b18bba967c6fb8f3934aa4cf1cfc8a2c1106 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-20T16:12:54Z FLINK-4545 also make the ResultPartitionType available at the InputGate This way, we know what kind of result partition is consumed by the input gate. commit d23fdf9d80dea5d46bfe2f7597f4d5e1295cae7b Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-13T18:24:45Z FLINK-4545 try to set an upper bound on the LocalBufferPool if restricted Use "2 * <number of channels> + 8" from the following considerations: 1 buffer for in-flight data in the subpartition/input channel 1 buffer for parallel serialization + some extra buffers Also re-introduce some tests for bounded buffer pools similar to the fixed-size buffer pool tests before. commit 37eed7bc59b6899e3d7bdd4b1a3dac87e5f04406 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-24T12:41:11Z FLINK-4545 re-implement NetworkBufferPool#redistributeBuffers This version also takes the bounded network buffers into account. The distribution is not strictly uniform anymore though: for every buffer pool, we determine the maximum number of buffers it can take from the available number - let's call this its 'capacity' then, each of them will get roughly available * capacity / totalCapacity buffers on top of the required number of buffers
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3467

          Hi @zhijiangW,
          actually, the solution I am working on is to replace the network buffers parameter by something like "max memory in percent" and "min MB to use". For this to not create buffer bloat in our network stack, I have started to implement limited `LocalBufferPool` instances which tune their size based on the actual number of outgoing and ingoing channels. It is actually not much more complicated than this and I already started on this in my local branch at https://github.com/NicoK/flink/tree/flink-4545 - expect a new PR within the week with more details.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3467 Hi @zhijiangW, actually, the solution I am working on is to replace the network buffers parameter by something like "max memory in percent" and "min MB to use". For this to not create buffer bloat in our network stack, I have started to implement limited `LocalBufferPool` instances which tune their size based on the actual number of outgoing and ingoing channels. It is actually not much more complicated than this and I already started on this in my local branch at https://github.com/NicoK/flink/tree/flink-4545 - expect a new PR within the week with more details.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3467

          Hi @NicoK , I am interested in this issue and I like the way of asserting hold lock in this PR.

          It is really necessary to manage network buffers by framework, because it is difficult to set the exact number of buffers by users. And our current simple solution is to expand the `ResourceProfile` by adding the total number of input and output edges for `Execution`. Then the `ResourceManager` would calculate the buffer amounts based on that and overwrite the parameter value to `TaskManager` configuration.

          From @StephanEwen mentioned before, I know a little for this issue. Would you share some detail designs for plans for it if have, then I can learn and track the progress in time. Thank you !

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3467 Hi @NicoK , I am interested in this issue and I like the way of asserting hold lock in this PR. It is really necessary to manage network buffers by framework, because it is difficult to set the exact number of buffers by users. And our current simple solution is to expand the `ResourceProfile` by adding the total number of input and output edges for `Execution`. Then the `ResourceManager` would calculate the buffer amounts based on that and overwrite the parameter value to `TaskManager` configuration. From @StephanEwen mentioned before, I know a little for this issue. Would you share some detail designs for plans for it if have, then I can learn and track the progress in time. Thank you !
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user NicoK opened a pull request:

          https://github.com/apache/flink/pull/3467

          FLINK-4545 preparations for removing the network buffers parameter

          This PR includes some preparations for following PRs that ultimately lead to removing the network buffer parameter that was hard to tune.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/NicoK/flink flink-4545-prep

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3467.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3467


          commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-10T13:36:37Z

          FLINK-4545 remove (unused) persistent partition type

          commit 11557c004450bcbbe680f1575f0e41d164424eae
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-10T15:11:08Z

          [docs] improve some documentation around network buffers

          commit cd999061d04ae803c79473241ac1f9b39c1f2731
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-02-10T15:12:19Z

          [hotfix][network] add some assertions documenting on which locks we rely

          commit 8f529bb3f42916c816c5091228569952917ad9b5
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-03-01T13:33:44Z

          FLINK-4545 remove fixed-size BufferPool instances

          These were unused except for unit tests and will be replaced with bounded
          BufferPool instances.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/3467 FLINK-4545 preparations for removing the network buffers parameter This PR includes some preparations for following PRs that ultimately lead to removing the network buffer parameter that was hard to tune. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-4545-prep Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3467.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3467 commit dfea1bac97dbbf30a2e049618cc41fdca53ea6d3 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-10T13:36:37Z FLINK-4545 remove (unused) persistent partition type commit 11557c004450bcbbe680f1575f0e41d164424eae Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-10T15:11:08Z [docs] improve some documentation around network buffers commit cd999061d04ae803c79473241ac1f9b39c1f2731 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-02-10T15:12:19Z [hotfix] [network] add some assertions documenting on which locks we rely commit 8f529bb3f42916c816c5091228569952917ad9b5 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-03-01T13:33:44Z FLINK-4545 remove fixed-size BufferPool instances These were unused except for unit tests and will be replaced with bounded BufferPool instances.
          Hide
          greghogan Greg Hogan added a comment -

          In 1.2 we now expose metrics for the number of allocated and in use network buffers. This is now in the web UI but wasn't properly populating the values.

          Is there an exact "best" number of network buffers to allocate? Flink may run with a minimal allocation but this may cause additional backpressure while waiting for buffers to be processed and become free. We recommend something similar by advising users to maximize the TaskManager memory allocation, which may be better for streaming than batch since after spilling to disk the merge-sort makes use of the filesystem's read-ahead cache.

          +1 to dynamic scaling, and also if/when memory buffers are dynamically allocated between operators.

          Show
          greghogan Greg Hogan added a comment - In 1.2 we now expose metrics for the number of allocated and in use network buffers. This is now in the web UI but wasn't properly populating the values. Is there an exact "best" number of network buffers to allocate? Flink may run with a minimal allocation but this may cause additional backpressure while waiting for buffers to be processed and become free. We recommend something similar by advising users to maximize the TaskManager memory allocation, which may be better for streaming than batch since after spilling to disk the merge-sort makes use of the filesystem's read-ahead cache. +1 to dynamic scaling, and also if/when memory buffers are dynamically allocated between operators.
          Hide
          StephanEwen Stephan Ewen added a comment -

          For a YARN-job-at-a-time setup, one could pre-compute that.
          For more dynamic setups its a little more tricky, if you do not now the job or the number of TaskManagers up front.

          Also - the formula needs to take a "buffers *= numShuffles into account.

          Show
          StephanEwen Stephan Ewen added a comment - For a YARN-job-at-a-time setup, one could pre-compute that. For more dynamic setups its a little more tricky, if you do not now the job or the number of TaskManagers up front. Also - the formula needs to take a " buffers *= numShuffles into account.
          Hide
          jgrier Jamie Grier added a comment -

          Big +1!

          In general I would love to see this improved. In my experience this is the "one thing" that people run into with Flink, whereas everything else "just works" this one parameter they have to set/tune and it's very confusing to newcomers.

          The equation to get this right is complex and the "correct" setting changes based on how they deploy the job, what parallelism they use, how many TMs, etc, etc.

          It also often happens that things are working and then a user changes their job a bit (adding a keyBy for instance) and then it stops working at they have a hard time understanding why.

          Is there a way we can set this parameter automatically in a majority of use cases? If folks are running single jobs directly on YARN for instance it seems we should have all the information necessary to set this parameter auto-magically or at least fail-fast and tell the the user what the parameter should be set to.

          Show
          jgrier Jamie Grier added a comment - Big +1! In general I would love to see this improved. In my experience this is the "one thing" that people run into with Flink, whereas everything else "just works" this one parameter they have to set/tune and it's very confusing to newcomers. The equation to get this right is complex and the "correct" setting changes based on how they deploy the job, what parallelism they use, how many TMs, etc, etc. It also often happens that things are working and then a user changes their job a bit (adding a keyBy for instance) and then it stops working at they have a hard time understanding why. Is there a way we can set this parameter automatically in a majority of use cases? If folks are running single jobs directly on YARN for instance it seems we should have all the information necessary to set this parameter auto-magically or at least fail-fast and tell the the user what the parameter should be set to.

            People

            • Assignee:
              NicoK Nico Kruber
              Reporter:
              zhenzhongxu Zhenzhong Xu
            • Votes:
              1 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development