Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4894

Don't block on buffer request after broadcastEvent

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.1.4
    • Component/s: None
    • Labels:
      None

      Description

      After broadcasting an event (like the checkpoint barrier), the record writer might block on a buffer request although that buffer will only be needed on the next write on that channel.

      Instead of assuming that each serializer has a buffer set, we can change the logic in the writer to request the buffer when it requires one.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2690

          FLINK-4894 [network] Don't request buffer after writing to partition

          After emitting a record via the RecordWriter, we eagerly requested a new buffer for the next emit on that channel (although it's not clear that we will immediately need it). With this change, we request that buffer lazily when an emit call requires it.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 4894-late_request_buffer

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2690.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2690


          commit a4dbe0c8db4bd8fee1a508ae212f80fa7c8bb824
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-10-24T16:01:28Z

          FLINK-4894 [network] Don't request buffer after writing to partition

          After emitting a record via the RecordWriter, we eagerly requested
          a new buffer for the next emit on that channel (although it's not clear
          that we will immediately need it). With this change, we request that
          buffer lazily when an emit call requires it.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2690 FLINK-4894 [network] Don't request buffer after writing to partition After emitting a record via the RecordWriter, we eagerly requested a new buffer for the next emit on that channel (although it's not clear that we will immediately need it). With this change, we request that buffer lazily when an emit call requires it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 4894-late_request_buffer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2690.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2690 commit a4dbe0c8db4bd8fee1a508ae212f80fa7c8bb824 Author: Ufuk Celebi <uce@apache.org> Date: 2016-10-24T16:01:28Z FLINK-4894 [network] Don't request buffer after writing to partition After emitting a record via the RecordWriter, we eagerly requested a new buffer for the next emit on that channel (although it's not clear that we will immediately need it). With this change, we request that buffer lazily when an emit call requires it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2690

          PS: I would like to apply this to the `release-1.1` branch as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2690 PS: I would like to apply this to the `release-1.1` branch as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2690#discussion_r85199848

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java —
          @@ -108,15 +108,25 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter

          synchronized (serializer) {
          SerializationResult result = serializer.addRecord(record);
          +
          while (result.isFullBuffer()) {
          Buffer buffer = serializer.getCurrentBuffer();

          if (buffer != null) {

          • writeBuffer(buffer, targetChannel, serializer);
            + writeAndClearBuffer(buffer, targetChannel, serializer);
            +
            + // If this was a full record, we are done. Not breaking
            + // out of the loop at this point will lead to another
            + // buffer request before breaking out (that would not be
            + // a problem per se, but it can lead to stalls in the
            + // pipeline).
            + if (result.isFullRecord()) {
            + break;
              • End diff –

          Can this only happen when the end of the record and the end of the buffer are exactly aligned?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2690#discussion_r85199848 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java — @@ -108,15 +108,25 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter synchronized (serializer) { SerializationResult result = serializer.addRecord(record); + while (result.isFullBuffer()) { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { writeBuffer(buffer, targetChannel, serializer); + writeAndClearBuffer(buffer, targetChannel, serializer); + + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; End diff – Can this only happen when the end of the record and the end of the buffer are exactly aligned?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2690

          This looks very good.

          To double check: Do we have tests for the following situations?

          • Records larger than buffers
          • Records such that they exactly line up with buffer size (full record and full buffer always coincide)

          If not, those would be important to have.

          Otherwise +1 to merge this to for both 1.2.0 and 1.1.4

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2690 This looks very good. To double check: Do we have tests for the following situations? Records larger than buffers Records such that they exactly line up with buffer size (full record and full buffer always coincide) If not, those would be important to have. Otherwise +1 to merge this to for both 1.2.0 and 1.1.4
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2690#discussion_r85299971

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java —
          @@ -108,15 +108,25 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter

          synchronized (serializer) {
          SerializationResult result = serializer.addRecord(record);
          +
          while (result.isFullBuffer()) {
          Buffer buffer = serializer.getCurrentBuffer();

          if (buffer != null) {

          • writeBuffer(buffer, targetChannel, serializer);
            + writeAndClearBuffer(buffer, targetChannel, serializer);
            +
            + // If this was a full record, we are done. Not breaking
            + // out of the loop at this point will lead to another
            + // buffer request before breaking out (that would not be
            + // a problem per se, but it can lead to stalls in the
            + // pipeline).
            + if (result.isFullRecord()) {
            + break;
              • End diff –

          Yes, exactly.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2690#discussion_r85299971 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java — @@ -108,15 +108,25 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter synchronized (serializer) { SerializationResult result = serializer.addRecord(record); + while (result.isFullBuffer()) { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { writeBuffer(buffer, targetChannel, serializer); + writeAndClearBuffer(buffer, targetChannel, serializer); + + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; End diff – Yes, exactly.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2690

          That is tested in the record serializer tests and now also in the added writer test.

          In general, the record writer and serializer are very tightly coupled and confusing at times. I think it would be very good to do some refactorings here – at a later time though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2690 That is tested in the record serializer tests and now also in the added writer test. In general, the record writer and serializer are very tightly coupled and confusing at times. I think it would be very good to do some refactorings here – at a later time though.
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in 529534f (release-1.1), cbdb784 (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in 529534f (release-1.1), cbdb784 (master).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2690

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2690
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/2716

          Fix for FLINK-4894 [network] Don't request buffer after writing to partition

          This re-opens #2690.

          After merging, we noticed that the test were broken by the changed `hasData` check. This is reverted here and the `clearCurrentBuffer` has been updated to correctly reset the position and limit.

          I'm mainly re-opening this to wait for Travis. In release-1.1 it's already fixed. If someone has spare time and wants to look at the changes again, feel free to do so.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink fix_record_serializer

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2716.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2716


          commit aaee4a9d889b76cd3d9e63912c5481ed1ce89140
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-10-24T16:01:28Z

          FLINK-4894 [network] Don't request buffer after writing to partition

          After emitting a record via the RecordWriter, we eagerly requested
          a new buffer for the next emit on that channel (although it's not clear
          that we will immediately need it). With this change, we request that
          buffer lazily when an emit call requires it.

          This closes #2690.

          commit 124ab2caf467dca882d4924beba7586d5f8cfbee
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2016-10-28T09:31:44Z

          FLINK-4894 [network] Fix hasData() and correctly clear buffer state


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/2716 Fix for FLINK-4894 [network] Don't request buffer after writing to partition This re-opens #2690. After merging, we noticed that the test were broken by the changed `hasData` check. This is reverted here and the `clearCurrentBuffer` has been updated to correctly reset the position and limit. I'm mainly re-opening this to wait for Travis. In release-1.1 it's already fixed. If someone has spare time and wants to look at the changes again, feel free to do so. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink fix_record_serializer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2716.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2716 commit aaee4a9d889b76cd3d9e63912c5481ed1ce89140 Author: Ufuk Celebi <uce@apache.org> Date: 2016-10-24T16:01:28Z FLINK-4894 [network] Don't request buffer after writing to partition After emitting a record via the RecordWriter, we eagerly requested a new buffer for the next emit on that channel (although it's not clear that we will immediately need it). With this change, we request that buffer lazily when an emit call requires it. This closes #2690. commit 124ab2caf467dca882d4924beba7586d5f8cfbee Author: Ufuk Celebi <uce@apache.org> Date: 2016-10-28T09:31:44Z FLINK-4894 [network] Fix hasData() and correctly clear buffer state
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/2716

          +1
          Merging this...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2716 +1 Merging this...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2716

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2716
          Hide
          StephanEwen Stephan Ewen added a comment -

          Merged additional patch in 3e85fc6138b555a6a5595fcba73a024b2dd5c179

          Show
          StephanEwen Stephan Ewen added a comment - Merged additional patch in 3e85fc6138b555a6a5595fcba73a024b2dd5c179

            People

            • Assignee:
              uce Ufuk Celebi
              Reporter:
              uce Ufuk Celebi
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development