Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5169

Make consumption of input channels fair

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.1.4
    • Component/s: Network
    • Labels:
      None

      Description

      The input channels on the receiver side of the network stack queue incoming data and notify the input gate about available data. These notifications currently determine the order in which input channels are consumed, which can lead to unfair consumption patterns where faster channels are favored over slower ones.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2967

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2967
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2967

          Good catches! Merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2967 Good catches! Merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user BorisOsipov opened a pull request:

          https://github.com/apache/flink/pull/2967

          FLINK-5169 Fix String.format() wildcards

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed
          • small string format fix

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/BorisOsipov/flink StringFormatIssues

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2967.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2967


          commit 2980a99161c9a4a1d35662299b7eb61f12c6b4e8
          Author: Boris Osipov <boris_osipov@epam.com>
          Date: 2016-12-08T13:58:21Z

          FLINK-5169 Make consumption of input channels fair

          • small string format fix

          Show
          githubbot ASF GitHub Bot added a comment - GitHub user BorisOsipov opened a pull request: https://github.com/apache/flink/pull/2967 FLINK-5169 Fix String.format() wildcards Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed small string format fix You can merge this pull request into a Git repository by running: $ git pull https://github.com/BorisOsipov/flink StringFormatIssues Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2967.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2967 commit 2980a99161c9a4a1d35662299b7eb61f12c6b4e8 Author: Boris Osipov <boris_osipov@epam.com> Date: 2016-12-08T13:58:21Z FLINK-5169 Make consumption of input channels fair small string format fix
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in 6cfce17, 8d97eaa, 5ebd7c8, 2bf8722 (release-1.1) and f728129, d3ac0ad, c0cdc5c, 2fcef5e (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in 6cfce17, 8d97eaa, 5ebd7c8, 2bf8722 (release-1.1) and f728129, d3ac0ad, c0cdc5c, 2fcef5e (master).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce closed the pull request at:

          https://github.com/apache/flink/pull/2882

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce closed the pull request at: https://github.com/apache/flink/pull/2882
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2882

          Thanks for the review, I'll address them and merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2882 Thanks for the review, I'll address them and merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2882#discussion_r89805572

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java —
          @@ -518,7 +518,8 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId)

          { partitionId); }
          • private void queueChannel(InputChannel channel) {
            + @VisibleForTesting
            + void queueChannel(InputChannel channel) {
              • End diff –

          I think we can undo this change

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2882#discussion_r89805572 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java — @@ -518,7 +518,8 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) { partitionId); } private void queueChannel(InputChannel channel) { + @VisibleForTesting + void queueChannel(InputChannel channel) { End diff – I think we can undo this change
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2882#discussion_r89810526

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java —
          @@ -53,10 +52,10 @@
          private BufferPool bufferPool;

          PartitionRequestServerHandler(

          • ResultPartitionProvider partitionProvider,
          • TaskEventDispatcher taskEventDispatcher,
          • PartitionRequestQueue outboundQueue,
          • NetworkBufferPool networkBufferPool) {
              • End diff –

          I think this formatting change made the readability a little bit worse.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2882#discussion_r89810526 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java — @@ -53,10 +52,10 @@ private BufferPool bufferPool; PartitionRequestServerHandler( ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher, PartitionRequestQueue outboundQueue, NetworkBufferPool networkBufferPool) { End diff – I think this formatting change made the readability a little bit worse.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2882#discussion_r89806640

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java —
          @@ -52,12 +51,10 @@

          — End diff –

          Remove the unused 'LOG'

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2882#discussion_r89806640 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java — @@ -52,12 +51,10 @@ — End diff – Remove the unused 'LOG'
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2882#discussion_r89757176

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java —
          @@ -495,6 +518,27 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId)

          { partitionId); }

          + private void queueChannel(InputChannel channel) {
          + int availableChannels;
          +
          + synchronized (inputChannelsWithData) {
          + availableChannels = inputChannelsWithData.size();
          +
          + inputChannelsWithData.add(channel);
          +
          + if (availableChannels == 0) {
          + inputChannelsWithData.notify();
          — End diff –

          This should be a `notifyAll()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2882#discussion_r89757176 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java — @@ -495,6 +518,27 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) { partitionId); } + private void queueChannel(InputChannel channel) { + int availableChannels; + + synchronized (inputChannelsWithData) { + availableChannels = inputChannelsWithData.size(); + + inputChannelsWithData.add(channel); + + if (availableChannels == 0) { + inputChannelsWithData.notify(); — End diff – This should be a `notifyAll()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2882

          FLINK-5169 [network] Make consumption of InputChannels fair

          We worked with @StephanEwen on this. The changes touched many tests and I decided to separate the changes in two commits, one for the main change and one for the adjusted tests. This way, I hope that the main changes are easier to review.

          1. Changes

          The goal of the changes was to make the consumption of input channels fairer. For batch jobs, this unfairness was not a real issue, but for streaming jobs it affects the checkpointing badly as checkpoint barriers flow inline with the data and hence are affected by this unfairness.

            1. InputGate

          The input gates are now notified whenever a channel becomes non-empty and check whether they need to re-queue an input channel whenever they consume one. Therefore, channels now return `BufferAndAvailability` and not just a `Buffer`.

          The same logic is applied for input gates with the `UnionInputGate`, which is notified when a `InputGate` becomes non-empty.

              1. Before
          • Channel notifies gate on each available `Buffer` instance
          • Each notification queues channel in `inputChannelsWithData`, possibly queueing each channel multiple times (this is where the unfairness happened)
              1. Now
          • Channel notifies gate on non-emptiness
          • Each channel re-queued by the gate if it has more data available by the input gate itself
          • Each channel is only queued once in `inputChannelsWithData` (this is where it becomes fairer)
            1. ResultSubpartition

          In order to allow this change to be implemented reliably, we had to change the partition consumption interfaces. When creating a sub partition reader we now instantiate the reader with a buffer availability listener, which is notified about available buffers.

          This gets rid of some "look ahead magic" in the `LocalInputChannel` and replaces it with pretty straight forward notifications. Furthermore, we don't need the checkNotNull-registerListener loops when consuming data.

          A shortcoming is that in some parts we have to be very aware that the notifications happen when the reader is created (see `LocalInputChannel#checkAndWaitForSubpartitionView` and `PartitionRequestQueue#notifyReaderNonEmpty`). This is not nice, but all in all an improvement over the previous state. We can refactor this further to smoothen things out.

          The unused `IOMode` for spillable partition types has been removed. I fear that these partition types are quite fragile at the moment and most probably should be revisted in a larger refactoring.

          I would like to have this in for the upcoming 1.1.4 release. It is quite a big change for a bug fix release, but in large scale setups this unfairness interplays badly with the checkpoint barriers.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 5169-fairconsumers-1.1

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2882.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2882


          commit 2aad264e6d9273d15b0e04527f0f2593d2951e8c
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-11-27T17:15:40Z

          FLINK-5169 [network] Add tests for channel consumption

          commit 212d6ac7c86a4426b6e4418ae5f9d1d4680ec05b
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2016-11-28T08:59:29Z

          FLINK-5169 [network] Make consumption of InputChannels fair

          commit 564b96d72b1536f4ab171c723e3ae0ab2e3c935c
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-11-28T08:59:58Z

          FLINK-5169 [network] Adjust tests to new consumer logic


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2882 FLINK-5169 [network] Make consumption of InputChannels fair We worked with @StephanEwen on this. The changes touched many tests and I decided to separate the changes in two commits, one for the main change and one for the adjusted tests. This way, I hope that the main changes are easier to review. Changes The goal of the changes was to make the consumption of input channels fairer. For batch jobs, this unfairness was not a real issue, but for streaming jobs it affects the checkpointing badly as checkpoint barriers flow inline with the data and hence are affected by this unfairness. InputGate The input gates are now notified whenever a channel becomes non-empty and check whether they need to re-queue an input channel whenever they consume one. Therefore, channels now return `BufferAndAvailability` and not just a `Buffer`. The same logic is applied for input gates with the `UnionInputGate`, which is notified when a `InputGate` becomes non-empty. Before Channel notifies gate on each available `Buffer` instance Each notification queues channel in `inputChannelsWithData`, possibly queueing each channel multiple times (this is where the unfairness happened) Now Channel notifies gate on non-emptiness Each channel re-queued by the gate if it has more data available by the input gate itself Each channel is only queued once in `inputChannelsWithData` (this is where it becomes fairer) ResultSubpartition In order to allow this change to be implemented reliably, we had to change the partition consumption interfaces. When creating a sub partition reader we now instantiate the reader with a buffer availability listener, which is notified about available buffers. This gets rid of some "look ahead magic" in the `LocalInputChannel` and replaces it with pretty straight forward notifications. Furthermore, we don't need the checkNotNull-registerListener loops when consuming data. A shortcoming is that in some parts we have to be very aware that the notifications happen when the reader is created (see `LocalInputChannel#checkAndWaitForSubpartitionView` and `PartitionRequestQueue#notifyReaderNonEmpty`). This is not nice, but all in all an improvement over the previous state. We can refactor this further to smoothen things out. The unused `IOMode` for spillable partition types has been removed. I fear that these partition types are quite fragile at the moment and most probably should be revisted in a larger refactoring. — I would like to have this in for the upcoming 1.1.4 release. It is quite a big change for a bug fix release, but in large scale setups this unfairness interplays badly with the checkpoint barriers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 5169-fairconsumers-1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2882 commit 2aad264e6d9273d15b0e04527f0f2593d2951e8c Author: Stephan Ewen <sewen@apache.org> Date: 2016-11-27T17:15:40Z FLINK-5169 [network] Add tests for channel consumption commit 212d6ac7c86a4426b6e4418ae5f9d1d4680ec05b Author: Stephan Ewen <sewen@apache.org> Date: 2016-11-28T08:59:29Z FLINK-5169 [network] Make consumption of InputChannels fair commit 564b96d72b1536f4ab171c723e3ae0ab2e3c935c Author: Ufuk Celebi <uce@apache.org> Date: 2016-11-28T08:59:58Z FLINK-5169 [network] Adjust tests to new consumer logic

            People

            • Assignee:
              uce Ufuk Celebi
              Reporter:
              uce Ufuk Celebi
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development