Details

    • Type: Sub-task
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 1.4.0
    • Fix Version/s: None
    • Component/s: Core, Network
    • Labels:
      None

      Description

      In order to send flink buffers through netty into the network, we need to make the buffers use off-heap memory. Otherwise, there will be a hidden copy happening in the NIO stack.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/4481

          FYI: I just rebased this PR onto current `master` to make this mergable and support further extensions

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4481 FYI: I just rebased this PR onto current `master` to make this mergable and support further extensions
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/4481

          by cherry-picking the commits from #4506, plus some fixes for code which was changed in the wrong way previously, the failing yarn tests should now be fixed

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4481 by cherry-picking the commits from #4506, plus some fixes for code which was changed in the wrong way previously, the failing yarn tests should now be fixed
          Hide
          NicoK Nico Kruber added a comment -

          After changing to off-heap network buffers, we are hit by FLINK-7400 and thus failing yarn cluster environments.

          Show
          NicoK Nico Kruber added a comment - After changing to off-heap network buffers, we are hit by FLINK-7400 and thus failing yarn cluster environments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/4481

          ok, one test fixed, the other is not so simple but maybe @tillrohrmann can help with it:

          Inside `ContaineredTaskManagerParameters#create()`, we calculate the amount of off-heap space that we need and for yarn, we use exactly this amount for setting the `XX:MaxDirectMemorySize` JVM property without letting room for other components and libraries. This worked so far for the network buffers when memory as a whole was set to off/on-heap and the flink-reserved memory was not completely used. Now, however, if set to on-heap, the `-XX:MaxDirectMemorySize` is too sharp. I'm unsure about the solutions:
          1) remove setting `-XX:MaxDirectMemorySize` and let the JVM adjust automatically, or
          2) add some "sane" default to our off-heap usage?

          The same may apply to Mesos if `ResourceProfile(cpuCores, heapMemoryInMB, directMemoryInMB, nativeMemoryInMB)` is used. At the moment, only the other constructors are used leading to solution 1.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4481 ok, one test fixed, the other is not so simple but maybe @tillrohrmann can help with it: Inside `ContaineredTaskManagerParameters#create()`, we calculate the amount of off-heap space that we need and for yarn, we use exactly this amount for setting the ` XX:MaxDirectMemorySize` JVM property without letting room for other components and libraries. This worked so far for the network buffers when memory as a whole was set to off /on-heap and the flink-reserved memory was not completely used. Now, however, if set to on-heap, the `-XX:MaxDirectMemorySize` is too sharp. I'm unsure about the solutions: 1) remove setting `-XX:MaxDirectMemorySize` and let the JVM adjust automatically, or 2) add some "sane" default to our off-heap usage? The same may apply to Mesos if `ResourceProfile(cpuCores, heapMemoryInMB, directMemoryInMB, nativeMemoryInMB)` is used. At the moment, only the other constructors are used leading to solution 1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/4481

          actually, I need to fix the test failures in `ContaineredTaskManagerParametersTest` and some failure in the `flink-yarn-tests` first...

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4481 actually, I need to fix the test failures in `ContaineredTaskManagerParametersTest` and some failure in the `flink-yarn-tests` first...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4481#discussion_r131673257

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java —
          @@ -274,7 +260,7 @@ private void redistributeBuffers() throws IOException

          { return; }
          • /**
              • End diff –

          Ok, then just leave it

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4481#discussion_r131673257 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java — @@ -274,7 +260,7 @@ private void redistributeBuffers() throws IOException { return; } /** End diff – Ok, then just leave it
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4481#discussion_r131672655

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java —
          @@ -274,7 +260,7 @@ private void redistributeBuffers() throws IOException

          { return; }
          • /**
              • End diff –

          Actually, this was intentional since this was marked as a "dangling javadoc" by IntelliJ (it is just a longer inline-comment). I don't have too strong feelings about it though so we could either keep it or revert it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4481#discussion_r131672655 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java — @@ -274,7 +260,7 @@ private void redistributeBuffers() throws IOException { return; } /** End diff – Actually, this was intentional since this was marked as a "dangling javadoc" by IntelliJ (it is just a longer inline-comment). I don't have too strong feelings about it though so we could either keep it or revert it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/4481#discussion_r131648828

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java —
          @@ -274,7 +260,7 @@ private void redistributeBuffers() throws IOException

          { return; }
          • /**
              • End diff –

          Revert please.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4481#discussion_r131648828 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java — @@ -274,7 +260,7 @@ private void redistributeBuffers() throws IOException { return; } /** End diff – Revert please.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user NicoK opened a pull request:

          https://github.com/apache/flink/pull/4481

          FLINK-7316[network] always use off-heap network buffers

            1. What is the purpose of the change

          For now, network buffers may be on-heap or off-heap along with Flink memory settings. As a step towards passing our own (off-heap) buffers through netty to avoid unnecessary buffer copies, we make network buffers always off-heap

            1. Brief change log
          • always use off-heap buffers for the `NetworkBufferPool`
          • move `memoryType` from `NetworkEnvironmentConfiguration` to `TaskManagerServicesConfiguration`
          • adapt heap size calculations in bash scripts and Java source code
            1. Verifying this change

          This change is already covered by existing tests, such as: `TaskManagerServicesTest` for the heap szie calculations; tests under `flink/runtime/io/network` for most other aspects of the direct use of network buffers, especially `flink/runtime/io/network/buffer`; all integration tests with a full stack and non-local communication.

          Actually, we even increase the test coverage since most network buffer tests only tested on-heap so far which now does not exist anymore. These tests now cover the only existing option: off-heap network buffers.

            1. Does this pull request potentially affect one of the following parts:
          • Dependencies (does it add or upgrade a dependency): (no)
          • The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
          • The serializers: (no)
          • The runtime per-record code paths (performance sensitive): (yes - as in the network communication part)
          • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes - memory settings, but they effectively do not change except for the network buffers being off-heap now)
            1. Documentation
          • Does this pull request introduce a new feature? (no)
          • If yes, how is the feature documented? (docs, JavaDocs)

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/NicoK/flink flink-7316

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/4481.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #4481


          commit d87206435cabf3bf29560083b639077b103708b8
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-07-31T10:06:14Z

          [hotfix] fix some typos

          commit 4a46e615f38c3dde41d465ec3e26426dbf14df80
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-02T09:34:54Z

          FLINK-7310[core] always use the HybridMemorySegment

          Since we'd like to use our own off-heap buffers for network communication, we
          cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment.
          We thus drop any code that loads the HeapMemorySegment (it is still available
          if needed) in favour of the HybridMemorySegment which is able to work on both
          heap and off-heap memory.

          For the performance penalty of this change compared to using HeapMemorySegment
          alone, see this interesting blob article (from 2015):
          https://flink.apache.org/news/2015/09/16/off-heap-memory.html

          commit 1d838c82cef412a8ec143308e20a4d0d7882f3e8
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-02T09:35:16Z

          [hotfix][tests] add missing test descriptions

          commit 70b0985a62082766498e847f7a4f25e84b6c1f06
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-02T09:27:49Z

          [hotfix][core] add additional final methods in final classes

          This applies the scheme of HeapMemorySegment to HybridMemorySegment where core
          methods are also marked "final" to be more future-proof.

          commit bedf14708b7aba88761f05c19abaf7f26d16dd20
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-04T13:15:32Z

          FLINK-7312[checkstyle] remove trailing whitespace

          commit 67e37971a4f8d5c40290e7a9c8ae2e6a2e1deb68
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-04T13:20:28Z

          FLINK-7312[checkstyle] organise imports

          commit d8657c8f02ca16af1f9e08621a8f73adb5d26959
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-04T13:24:16Z

          FLINK-7312[checkstyle] add, adapt and improve comments

          commit 654b599569e3a3e5ac063d253311b67652a33c1d
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-04T13:26:40Z

          FLINK-7312[checkstyle] remove redundant "public" keyword in interfaces

          commit 7117de1adeefd624ae958370d9614162d18bd9ed
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-04T13:27:36Z

          FLINK-7312[checkstyle] ignore some spurious warnings

          commit dd150af85551f64c7f3a260a013d59b7d773f94a
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-04T13:35:15Z

          FLINK-7312[checkstyle] enable checkstyle for `flink/core/memory/*`

          We deliberately ignore redundant modifiers for now since we want `final`
          modifiers on `final` classes for increased future-proofness.

          commit adbfea59e0618c0212a820d724d677b97955c3c6
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-01T11:24:00Z

          FLINK-7316[network] always use off-heap network buffers

          This is another step at using or own (off-heap) buffers for network
          communication that we pass through netty in order to avoid unnecessary buffer
          copies.

          commit 139fdfc166ee465ce61d8f582cbbbf8c890ecccc
          Author: Nico Kruber <nico@data-artisans.com>
          Date: 2017-08-04T13:59:48Z

          FLINK-7316[docs] add a note of network buffers always being off-heap


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4481 FLINK-7316 [network] always use off-heap network buffers What is the purpose of the change For now, network buffers may be on-heap or off-heap along with Flink memory settings. As a step towards passing our own (off-heap) buffers through netty to avoid unnecessary buffer copies, we make network buffers always off-heap Brief change log always use off-heap buffers for the `NetworkBufferPool` move `memoryType` from `NetworkEnvironmentConfiguration` to `TaskManagerServicesConfiguration` adapt heap size calculations in bash scripts and Java source code Verifying this change This change is already covered by existing tests, such as: `TaskManagerServicesTest` for the heap szie calculations; tests under `flink/runtime/io/network` for most other aspects of the direct use of network buffers, especially `flink/runtime/io/network/buffer`; all integration tests with a full stack and non-local communication. Actually, we even increase the test coverage since most network buffer tests only tested on-heap so far which now does not exist anymore. These tests now cover the only existing option: off-heap network buffers. Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (no) The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) The serializers: (no) The runtime per-record code paths (performance sensitive): (yes - as in the network communication part) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes - memory settings, but they effectively do not change except for the network buffers being off-heap now) Documentation Does this pull request introduce a new feature? (no) If yes, how is the feature documented? (docs, JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7316 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4481 commit d87206435cabf3bf29560083b639077b103708b8 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-07-31T10:06:14Z [hotfix] fix some typos commit 4a46e615f38c3dde41d465ec3e26426dbf14df80 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-02T09:34:54Z FLINK-7310 [core] always use the HybridMemorySegment Since we'd like to use our own off-heap buffers for network communication, we cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment. We thus drop any code that loads the HeapMemorySegment (it is still available if needed) in favour of the HybridMemorySegment which is able to work on both heap and off-heap memory. For the performance penalty of this change compared to using HeapMemorySegment alone, see this interesting blob article (from 2015): https://flink.apache.org/news/2015/09/16/off-heap-memory.html commit 1d838c82cef412a8ec143308e20a4d0d7882f3e8 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-02T09:35:16Z [hotfix] [tests] add missing test descriptions commit 70b0985a62082766498e847f7a4f25e84b6c1f06 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-02T09:27:49Z [hotfix] [core] add additional final methods in final classes This applies the scheme of HeapMemorySegment to HybridMemorySegment where core methods are also marked "final" to be more future-proof. commit bedf14708b7aba88761f05c19abaf7f26d16dd20 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-04T13:15:32Z FLINK-7312 [checkstyle] remove trailing whitespace commit 67e37971a4f8d5c40290e7a9c8ae2e6a2e1deb68 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-04T13:20:28Z FLINK-7312 [checkstyle] organise imports commit d8657c8f02ca16af1f9e08621a8f73adb5d26959 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-04T13:24:16Z FLINK-7312 [checkstyle] add, adapt and improve comments commit 654b599569e3a3e5ac063d253311b67652a33c1d Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-04T13:26:40Z FLINK-7312 [checkstyle] remove redundant "public" keyword in interfaces commit 7117de1adeefd624ae958370d9614162d18bd9ed Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-04T13:27:36Z FLINK-7312 [checkstyle] ignore some spurious warnings commit dd150af85551f64c7f3a260a013d59b7d773f94a Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-04T13:35:15Z FLINK-7312 [checkstyle] enable checkstyle for `flink/core/memory/*` We deliberately ignore redundant modifiers for now since we want `final` modifiers on `final` classes for increased future-proofness. commit adbfea59e0618c0212a820d724d677b97955c3c6 Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-01T11:24:00Z FLINK-7316 [network] always use off-heap network buffers This is another step at using or own (off-heap) buffers for network communication that we pass through netty in order to avoid unnecessary buffer copies. commit 139fdfc166ee465ce61d8f582cbbbf8c890ecccc Author: Nico Kruber <nico@data-artisans.com> Date: 2017-08-04T13:59:48Z FLINK-7316 [docs] add a note of network buffers always being off-heap

            People

            • Assignee:
              NicoK Nico Kruber
              Reporter:
              NicoK Nico Kruber
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:

                Development