Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6337

Remove the buffer provider from PartitionRequestServerHandler

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Network
    • Labels:
      None

      Description

      Currently, PartitionRequestServerHandler will create a LocalBufferPool when the channel is registered. The LocalBufferPool is only used to get segment size for creating read view in SpillableSubpartition, and the buffers in the pool will not be used all the time, so it will waste the buffer resource of global pool.

      We would like to remove the LocalBufferPool from the PartitionRequestServerHandler, and the LocalBufferPool in ResultPartition can also provide the segment size for creating sub partition view.

        Issue Links

          Activity

          Hide
          zjwang zhijiang added a comment -

          Ufuk Celebi, do you think it makes sense to remove the buffer provider from server handler? I wonder there would be future plans conflicted with it.
          If not, I will submit the pull request for it. Thank you!

          Show
          zjwang zhijiang added a comment - Ufuk Celebi , do you think it makes sense to remove the buffer provider from server handler? I wonder there would be future plans conflicted with it. If not, I will submit the pull request for it. Thank you!
          Hide
          uce Ufuk Celebi added a comment -

          Hey zhijiang! I think the buffer pool is required for spillable result partitions (when creating SpillableSubpartitionView). When they are consumed, a buffer provider needs to be given to the partition to copy the buffers to. I think this is not implemented in an efficient way at the moment (ideally the copy should not be necessary), but the proper implementation would require some more involved refactorings.

          Show
          uce Ufuk Celebi added a comment - Hey zhijiang ! I think the buffer pool is required for spillable result partitions (when creating SpillableSubpartitionView). When they are consumed, a buffer provider needs to be given to the partition to copy the buffers to. I think this is not implemented in an efficient way at the moment (ideally the copy should not be necessary), but the proper implementation would require some more involved refactorings.
          Hide
          zjwang zhijiang added a comment - - edited

          Hey Ufuk Celebi, Thank you for so quick response!

          Currently the buffer provider only provides the segment size for SpillableSubpartitionView, and the actual buffers which would be spilled to disk are provided by LocalBufferPool from ResultPartition. If SpillableSubpartitionView needs another buffer provider to copy the buffers from ResultPartition in future plans, I think this buffer provider can be provided from result partition manager or other components instead of server handler, because the handler can not get the result partition type. And current mode will waste a lot of available buffers from global buffer pool. If there are no specific plans and time for this refactoring, remove it from handler can get benefits and not effect the current behavior.

          The motivation is that our users are confused of why the total available buffers are less than the amount in configuration.

          Show
          zjwang zhijiang added a comment - - edited Hey Ufuk Celebi , Thank you for so quick response! Currently the buffer provider only provides the segment size for SpillableSubpartitionView, and the actual buffers which would be spilled to disk are provided by LocalBufferPool from ResultPartition. If SpillableSubpartitionView needs another buffer provider to copy the buffers from ResultPartition in future plans, I think this buffer provider can be provided from result partition manager or other components instead of server handler, because the handler can not get the result partition type. And current mode will waste a lot of available buffers from global buffer pool. If there are no specific plans and time for this refactoring, remove it from handler can get benefits and not effect the current behavior. The motivation is that our users are confused of why the total available buffers are less than the amount in configuration.
          Hide
          uce Ufuk Celebi added a comment -

          Just double checked this and I think you are right. We currently use the buffer provider only for the buffer size which doesn't make sense. +1 to remove. I agree that the server handler is not a good place for it anyways (I think that was a work around in the initial version that is not needed any more). Could you make this refactoring an independent pull request so that we can review it easily?

          Show
          uce Ufuk Celebi added a comment - Just double checked this and I think you are right. We currently use the buffer provider only for the buffer size which doesn't make sense. +1 to remove. I agree that the server handler is not a good place for it anyways (I think that was a work around in the initial version that is not needed any more). Could you make this refactoring an independent pull request so that we can review it easily?
          Hide
          zjwang zhijiang added a comment -

          Yeah, I got your point. Thank you for check and advice, my friend!

          Show
          zjwang zhijiang added a comment - Yeah, I got your point. Thank you for check and advice, my friend!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zhijiangW opened a pull request:

          https://github.com/apache/flink/pull/3785

          FLINK-6337[network] Remove the buffer provider from PartitionRequestServerHandler

          Currently, `PartitionRequestServerHandler` will create a `LocalBufferPool` when the channel is registered. The `LocalBufferPool` is only used to get segment size for creating read view in `SpillableSubpartition`, and the buffers in the pool will not be used all the time, so it will waste the buffer resource of global pool.

          We would like to remove the `LocalBufferPool` from the `PartitionRequestServerHandler`, and the `LocalBufferPool` in `ResultPartition` can also provide the segment size for creating sub partition view.

          This modification will not effect the current behavior and will get benefits of saving buffer resources.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zhijiangW/flink FLINK-6337

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3785.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3785


          commit 306f32414eabce5522ec2a8883aa27285e6aa3e4
          Author: Zhijiang <wangzhijiang999@aliyun.com>
          Date: 2017-04-26T08:18:54Z

          FLINK-6337[network] Remove the buffer provider from PartitionRequestServerHandler


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/3785 FLINK-6337 [network] Remove the buffer provider from PartitionRequestServerHandler Currently, `PartitionRequestServerHandler` will create a `LocalBufferPool` when the channel is registered. The `LocalBufferPool` is only used to get segment size for creating read view in `SpillableSubpartition`, and the buffers in the pool will not be used all the time, so it will waste the buffer resource of global pool. We would like to remove the `LocalBufferPool` from the `PartitionRequestServerHandler`, and the `LocalBufferPool` in `ResultPartition` can also provide the segment size for creating sub partition view. This modification will not effect the current behavior and will get benefits of saving buffer resources. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-6337 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3785.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3785 commit 306f32414eabce5522ec2a8883aa27285e6aa3e4 Author: Zhijiang <wangzhijiang999@aliyun.com> Date: 2017-04-26T08:18:54Z FLINK-6337 [network] Remove the buffer provider from PartitionRequestServerHandler
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3785

          Hey @uce , I have submitted the pull request for the issue we confirmed before.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3785 Hey @uce , I have submitted the pull request for the issue we confirmed before.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3785#discussion_r113690389

          — Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java —
          @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() {

          try {
          runKMeans(cluster.getLeaderRPCPort());

          • fail("This program execution should have failed.");
              • End diff –

          Why did you remove this line? I think it's important to keep this line here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113690389 — Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java — @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() { try { runKMeans(cluster.getLeaderRPCPort()); fail("This program execution should have failed."); End diff – Why did you remove this line? I think it's important to keep this line here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3785#discussion_r113690239

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java —
          @@ -351,15 +351,15 @@ public void destroyBufferPool() {
          /**

          • Returns the requested subpartition.
            */
          • public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
            + public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
            int refCnt = pendingReferences.get();

          checkState(refCnt != -1, "Partition released.");
          checkState(refCnt > 0, "Partition not pinned.");

          checkElementIndex(index, subpartitions.length, "Subpartition not found.");

          • ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener);
            + ResultSubpartitionView readView = subpartitions[index].createReadView(bufferPool, availabilityListener);
              • End diff –

          I think we can completely remove the buffer provider from the `createReadView` method:

          • In `SpillableSubpartition#createReadView` we can use the segment size of the buffer pool of the spillable subpartition itself (`parent.getBufferProvider().getMemorySegmentSize()`).
          • In `PipelinedSubpartition#createReadView` we don't use the argument anyways.
          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113690239 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java — @@ -351,15 +351,15 @@ public void destroyBufferPool() { /** Returns the requested subpartition. */ public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { + public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { int refCnt = pendingReferences.get(); checkState(refCnt != -1, "Partition released."); checkState(refCnt > 0, "Partition not pinned."); checkElementIndex(index, subpartitions.length, "Subpartition not found."); ResultSubpartitionView readView = subpartitions [index] .createReadView(bufferProvider, availabilityListener); + ResultSubpartitionView readView = subpartitions [index] .createReadView(bufferPool, availabilityListener); End diff – I think we can completely remove the buffer provider from the `createReadView` method: In `SpillableSubpartition#createReadView` we can use the segment size of the buffer pool of the spillable subpartition itself (`parent.getBufferProvider().getMemorySegmentSize()`). In `PipelinedSubpartition#createReadView` we don't use the argument anyways.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3785#discussion_r113703927

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java —
          @@ -351,15 +351,15 @@ public void destroyBufferPool() {
          /**

          • Returns the requested subpartition.
            */
          • public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException {
            + public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
            int refCnt = pendingReferences.get();

          checkState(refCnt != -1, "Partition released.");
          checkState(refCnt > 0, "Partition not pinned.");

          checkElementIndex(index, subpartitions.length, "Subpartition not found.");

          • ResultSubpartitionView readView = subpartitions[index].createReadView(bufferProvider, availabilityListener);
            + ResultSubpartitionView readView = subpartitions[index].createReadView(bufferPool, availabilityListener);
              • End diff –

          Thank you for so efficient reviews!
          I agree with your suggestion. It will be more complete and pretty good.
          I will modify and submit another commit for it, and verify the tests again.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113703927 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java — @@ -351,15 +351,15 @@ public void destroyBufferPool() { /** Returns the requested subpartition. */ public ResultSubpartitionView createSubpartitionView(int index, BufferProvider bufferProvider, BufferAvailabilityListener availabilityListener) throws IOException { + public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { int refCnt = pendingReferences.get(); checkState(refCnt != -1, "Partition released."); checkState(refCnt > 0, "Partition not pinned."); checkElementIndex(index, subpartitions.length, "Subpartition not found."); ResultSubpartitionView readView = subpartitions [index] .createReadView(bufferProvider, availabilityListener); + ResultSubpartitionView readView = subpartitions [index] .createReadView(bufferPool, availabilityListener); End diff – Thank you for so efficient reviews! I agree with your suggestion. It will be more complete and pretty good. I will modify and submit another commit for it, and verify the tests again.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3785#discussion_r113706078

          — Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java —
          @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() {

          try {
          runKMeans(cluster.getLeaderRPCPort());

          • fail("This program execution should have failed.");
              • End diff –

          This case will fail if not remove it.

          I guess the buffer amount is not enough for this case before, because some are shared by buffer provider in handler. After remove the buffer provider from handler, the buffer is enough now. So the `runKmeans` can execute correctly, not enter catch part or fail process.

          I am not familiar with this case and not sure whether my understanding is right.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113706078 — Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java — @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() { try { runKMeans(cluster.getLeaderRPCPort()); fail("This program execution should have failed."); End diff – This case will fail if not remove it. I guess the buffer amount is not enough for this case before, because some are shared by buffer provider in handler. After remove the buffer provider from handler, the buffer is enough now. So the `runKmeans` can execute correctly, not enter catch part or fail process. I am not familiar with this case and not sure whether my understanding is right.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3785

          Hi @uce , I have submitted the modifications of `createReadView`, and the tests and IT have passed in my private travis. I checked the failed test `HistoryServerTest.testFullArchiveLifecycle` on this travis and it seems no related to my pull request. I run it several times separately again and all are passed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3785 Hi @uce , I have submitted the modifications of `createReadView`, and the tests and IT have passed in my private travis. I checked the failed test `HistoryServerTest.testFullArchiveLifecycle` on this travis and it seems no related to my pull request. I run it several times separately again and all are passed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3785#discussion_r113904383

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java —
          @@ -68,7 +68,11 @@ public static void shutdown() {

          @Override
          ResultSubpartition createSubpartition() {

          • return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager);
            + ResultPartition parent = mock(ResultPartition.class);
              • End diff –

          Could you make this a static helper method?
          ```java
          private static ResultPartition createMockPartition()

          { ... }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113904383 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java — @@ -68,7 +68,11 @@ public static void shutdown() { @Override ResultSubpartition createSubpartition() { return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager); + ResultPartition parent = mock(ResultPartition.class); End diff – Could you make this a static helper method? ```java private static ResultPartition createMockPartition() { ... } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3785#discussion_r113905412

          — Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java —
          @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() {

          try {
          runKMeans(cluster.getLeaderRPCPort());

          • fail("This program execution should have failed.");
              • End diff –

          I think that the idea of the test is to check that another program can be executed after one failed. The commit message introducing the test says "This test validates that task slots in co-location constraints are properly freed in the presence of failures." By removing this line, we are not testing what we actually wanted to test anymore. We should keep the line but instead decrease the number of configured buffers, for example 640 works instead of 840. That way we keep the behaviour: successful job, failed job, successful job.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113905412 — Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java — @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() { try { runKMeans(cluster.getLeaderRPCPort()); fail("This program execution should have failed."); End diff – I think that the idea of the test is to check that another program can be executed after one failed. The commit message introducing the test says "This test validates that task slots in co-location constraints are properly freed in the presence of failures." By removing this line, we are not testing what we actually wanted to test anymore. We should keep the line but instead decrease the number of configured buffers, for example 640 works instead of 840. That way we keep the behaviour: successful job, failed job, successful job.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3785#discussion_r113945018

          — Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java —
          @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() {

          try {
          runKMeans(cluster.getLeaderRPCPort());

          • fail("This program execution should have failed.");
              • End diff –

          Yes, I misunderstand this test and you get the key point.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113945018 — Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java — @@ -68,7 +68,6 @@ public void testSuccessfulProgramAfterFailure() { try { runKMeans(cluster.getLeaderRPCPort()); fail("This program execution should have failed."); End diff – Yes, I misunderstand this test and you get the key point.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3785#discussion_r113946358

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java —
          @@ -68,7 +68,11 @@ public static void shutdown() {

          @Override
          ResultSubpartition createSubpartition() {

          • return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager);
            + ResultPartition parent = mock(ResultPartition.class);
              • End diff –

          I guess you want to reuse this mocked `ResultPartition` in all the internal tests, and I notice that there already exists `createSubpartition` override method in this test, so I can modify the return type of `SpillableSubpartition` instead. Do you think so?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/3785#discussion_r113946358 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java — @@ -68,7 +68,11 @@ public static void shutdown() { @Override ResultSubpartition createSubpartition() { return new SpillableSubpartition(0, mock(ResultPartition.class), ioManager); + ResultPartition parent = mock(ResultPartition.class); End diff – I guess you want to reuse this mocked `ResultPartition` in all the internal tests, and I notice that there already exists `createSubpartition` override method in this test, so I can modify the return type of `SpillableSubpartition` instead. Do you think so?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zhijiangW commented on the issue:

          https://github.com/apache/flink/pull/3785

          Hi @uce, I have submitted the modifications for the above two issues. Welcome any comments!

          Show
          githubbot ASF GitHub Bot added a comment - Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/3785 Hi @uce, I have submitted the modifications for the above two issues. Welcome any comments!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/3785

          Thanks! Looks good to merge now 👍 I've rebased it and pushed it to Travis here: https://travis-ci.org/uce/flink/builds/227891373

          As soon as it succeeds, I'm going to merge this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/3785 Thanks! Looks good to merge now 👍 I've rebased it and pushed it to Travis here: https://travis-ci.org/uce/flink/builds/227891373 As soon as it succeeds, I'm going to merge this PR.
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in 464d6f5 (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in 464d6f5 (master).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3785

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3785

            People

            • Assignee:
              zjwang zhijiang
              Reporter:
              zjwang zhijiang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development