Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5059

only serialise events once in RecordWriter#broadcastEvent

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Network
    • Labels:
      None

      Description

      Currently, org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEvent serialises the event once per target channel. Instead, it could serialise the event only once and use the serialised form for every channel and thus save resources.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/2805

          Please adjust the PR title to the following format: FLINK-5059 <issue title/description>, this allows the mirroring of comments made here to JIRA.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/2805 Please adjust the PR title to the following format: FLINK-5059 <issue title/description>, this allows the mirroring of comments made here to JIRA.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r87965585

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java —
          @@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws
          // yet be created
          final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
          for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {

          • output.writeEventToAllChannels(message);
            + final Buffer eventBuffer = EventSerializer.toBuffer(message);
              • End diff –

          That was the case before and I could have adapted `ResultPartitionWriter#writeEventToAllChannels()` accordingly. The question is, however, whether we want `ResultPartitionWriter` to be aware of the difference between events and buffers or offer a cleaner API that is based on buffers only...

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r87965585 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — @@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); for (ResultPartitionWriter output : getEnvironment().getAllWriters()) { output.writeEventToAllChannels(message); + final Buffer eventBuffer = EventSerializer.toBuffer(message); End diff – That was the case before and I could have adapted `ResultPartitionWriter#writeEventToAllChannels()` accordingly. The question is, however, whether we want `ResultPartitionWriter` to be aware of the difference between events and buffers or offer a cleaner API that is based on buffers only...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r87967007

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java —
          @@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws
          // yet be created
          final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
          for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {

          • output.writeEventToAllChannels(message);
            + final Buffer eventBuffer = EventSerializer.toBuffer(message);
              • End diff –

          in any case the entire try/finally block could be moved into the ResultPartitionWriter, correct?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r87967007 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — @@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); for (ResultPartitionWriter output : getEnvironment().getAllWriters()) { output.writeEventToAllChannels(message); + final Buffer eventBuffer = EventSerializer.toBuffer(message); End diff – in any case the entire try/finally block could be moved into the ResultPartitionWriter, correct?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r87968119

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java —
          @@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws
          // yet be created
          final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
          for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {

          • output.writeEventToAllChannels(message);
            + final Buffer eventBuffer = EventSerializer.toBuffer(message);
              • End diff –

          yes, that's right - Let's go with `writeBufferToAllChannels(Buffer)` instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r87968119 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java — @@ -586,7 +588,18 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws // yet be created final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()); for (ResultPartitionWriter output : getEnvironment().getAllWriters()) { output.writeEventToAllChannels(message); + final Buffer eventBuffer = EventSerializer.toBuffer(message); End diff – yes, that's right - Let's go with `writeBufferToAllChannels(Buffer)` instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r90777407

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java —
          @@ -215,6 +189,31 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
          }
          }

          + private <T> void addBufferToOutputList(RecordDeserializer<DeserializationDelegate<T>> recordDeserializer,
          + NonReusingDeserializationDelegate<T> delegate, Buffer buffer,
          — End diff –

          In test scope, this is not a problem, but in main scope the style guidelines (and check style plugin) do not allow leading whitespace. Personally, I don't think it's good practice to differentiate between tests and main code. You might consider removing the leading space here, too (for example one argument per line).

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r90777407 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java — @@ -215,6 +189,31 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { } } + private <T> void addBufferToOutputList(RecordDeserializer<DeserializationDelegate<T>> recordDeserializer, + NonReusingDeserializationDelegate<T> delegate, Buffer buffer, — End diff – In test scope, this is not a problem, but in main scope the style guidelines (and check style plugin) do not allow leading whitespace. Personally, I don't think it's good practice to differentiate between tests and main code. You might consider removing the leading space here, too (for example one argument per line).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r90778003

          — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java —
          @@ -421,22 +423,19 @@ private ResultPartitionWriter createCollectingPartitionWriter(
          @Override
          public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
          Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];

          • Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
          • queues[targetChannel].add(new BufferOrEvent(buffer, targetChannel));
            + if (buffer.isBuffer()) { + Integer targetChannel = (Integer) invocationOnMock.getArguments()[1]; + queues[targetChannel].add(new BufferOrEvent(buffer, targetChannel)); + }

            else {
            + // is event:
            + AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
            + Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];

              • End diff –

          Was missing before, too, but we should recycle the buffer after creating the event.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r90778003 — Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java — @@ -421,22 +423,19 @@ private ResultPartitionWriter createCollectingPartitionWriter( @Override public Void answer(InvocationOnMock invocationOnMock) throws Throwable { Buffer buffer = (Buffer) invocationOnMock.getArguments() [0] ; Integer targetChannel = (Integer) invocationOnMock.getArguments() [1] ; queues [targetChannel] .add(new BufferOrEvent(buffer, targetChannel)); + if (buffer.isBuffer()) { + Integer targetChannel = (Integer) invocationOnMock.getArguments()[1]; + queues[targetChannel].add(new BufferOrEvent(buffer, targetChannel)); + } else { + // is event: + AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); + Integer targetChannel = (Integer) invocationOnMock.getArguments() [1] ; End diff – Was missing before, too, but we should recycle the buffer after creating the event.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r90777420

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java —
          @@ -215,6 +189,31 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
          }
          }

          + private <T> void addBufferToOutputList(RecordDeserializer<DeserializationDelegate<T>> recordDeserializer,
          — End diff –

          Not that it was commented before, but you might want to add a high level comment about what happens here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r90777420 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java — @@ -215,6 +189,31 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { } } + private <T> void addBufferToOutputList(RecordDeserializer<DeserializationDelegate<T>> recordDeserializer, — End diff – Not that it was commented before, but you might want to add a high level comment about what happens here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r90777958

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java —
          @@ -131,35 +132,30 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter
          }

          public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {

          • for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
          • RecordSerializer<T> serializer = serializers[targetChannel];
            -
          • synchronized (serializer) {
          • Buffer buffer = serializer.getCurrentBuffer();
          • if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); - }

            else if (serializer.hasData())

            { - throw new IllegalStateException("No buffer, but serializer has buffered data."); - }

            -

          • targetPartition.writeEvent(event, targetChannel);
          • }
          • }
          • }
            + final Buffer eventBuffer = EventSerializer.toBuffer(event);
            + try {
            + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
            + RecordSerializer<T> serializer = serializers[targetChannel];
          • public void sendEndOfSuperstep() throws IOException, InterruptedException {
          • for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
          • RecordSerializer<T> serializer = serializers[targetChannel];
            + synchronized (serializer) {
            + Buffer buffer = serializer.getCurrentBuffer();
            + if (buffer != null) { + writeAndClearBuffer(buffer, targetChannel, serializer); + }

            else if (serializer.hasData())

            { + // sanity check + throw new IllegalStateException("No buffer, but serializer has buffered data."); + }
          • synchronized (serializer) {
          • Buffer buffer = serializer.getCurrentBuffer();
          • if (buffer != null) {
          • writeAndClearBuffer(buffer, targetChannel, serializer);
            + // retain the buffer so that it can be recycled by each channel of targetPartition
            + eventBuffer.retain();
              • End diff –

          It would be good to add a special RecordWriterTest that ensure that the reference counting logic works.

          ```java
          @Test
          public void testBroadcastEventBufferReferenceCounting() throws Exception {
          Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);

          // Partial mocking of static method...
          PowerMockito.stub(PowerMockito.method(EventSerializer.class, "toBuffer")).toReturn(buffer);

          @SuppressWarnings("unchecked")
          ArrayDeque<BufferOrEvent>[] queues = new ArrayDeque[]

          { new ArrayDeque(), new ArrayDeque() }

          ;

          ResultPartitionWriter partition = createCollectingPartitionWriter(queues, new TestInfiniteBufferProvider());
          RecordWriter<?> writer = new RecordWriter<>(partition);

          writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);

          // Verify added to all queues
          assertEquals(1, queues[0].size());
          assertEquals(1, queues[1].size());

          assertTrue(buffer.isRecycled());
          }
          ```

          You have to adjust the `createCollectingPartitionWriter` to correctly recycle event buffers and replace the `PrepareForTest` class annotation with `@PrepareForTest(

          {ResultPartitionWriter.class, EventSerializer.class}

          )`.

          Not changed in this PR, but to work correctly this relies on the `ResultPartition` to recycle the buffer if the `add` calls fails. It might make sense to add a special test (to `ResultPartitionTest` or `RecordWriterTest`) where we ensure that this actually happens to guard against future behaviour changes in `ResultPartition`. A possible better behaviour would be to let the `RecordWriter` recycle it if an Exception occurs. This should be addressed in a different PR though.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r90777958 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java — @@ -131,35 +132,30 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter } public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer<T> serializer = serializers [targetChannel] ; - synchronized (serializer) { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); - } else if (serializer.hasData()) { - throw new IllegalStateException("No buffer, but serializer has buffered data."); - } - targetPartition.writeEvent(event, targetChannel); } } } + final Buffer eventBuffer = EventSerializer.toBuffer(event); + try { + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { + RecordSerializer<T> serializer = serializers [targetChannel] ; public void sendEndOfSuperstep() throws IOException, InterruptedException { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer<T> serializer = serializers [targetChannel] ; + synchronized (serializer) { + Buffer buffer = serializer.getCurrentBuffer(); + if (buffer != null) { + writeAndClearBuffer(buffer, targetChannel, serializer); + } else if (serializer.hasData()) { + // sanity check + throw new IllegalStateException("No buffer, but serializer has buffered data."); + } synchronized (serializer) { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { writeAndClearBuffer(buffer, targetChannel, serializer); + // retain the buffer so that it can be recycled by each channel of targetPartition + eventBuffer.retain(); End diff – It would be good to add a special RecordWriterTest that ensure that the reference counting logic works. ```java @Test public void testBroadcastEventBufferReferenceCounting() throws Exception { Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE); // Partial mocking of static method... PowerMockito.stub(PowerMockito.method(EventSerializer.class, "toBuffer")).toReturn(buffer); @SuppressWarnings("unchecked") ArrayDeque<BufferOrEvent>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() } ; ResultPartitionWriter partition = createCollectingPartitionWriter(queues, new TestInfiniteBufferProvider()); RecordWriter<?> writer = new RecordWriter<>(partition); writer.broadcastEvent(EndOfPartitionEvent.INSTANCE); // Verify added to all queues assertEquals(1, queues [0] .size()); assertEquals(1, queues [1] .size()); assertTrue(buffer.isRecycled()); } ``` You have to adjust the `createCollectingPartitionWriter` to correctly recycle event buffers and replace the `PrepareForTest` class annotation with `@PrepareForTest( {ResultPartitionWriter.class, EventSerializer.class} )`. — Not changed in this PR, but to work correctly this relies on the `ResultPartition` to recycle the buffer if the `add` calls fails. It might make sense to add a special test (to `ResultPartitionTest` or `RecordWriterTest`) where we ensure that this actually happens to guard against future behaviour changes in `ResultPartition`. A possible better behaviour would be to let the `RecordWriter` recycle it if an Exception occurs. This should be addressed in a different PR though.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r90777962

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java —
          @@ -71,21 +68,26 @@ public void writeBuffer(Buffer buffer, int targetChannel) throws IOException

          { partition.add(buffer, targetChannel); }
          • public void writeEvent(AbstractEvent event, int targetChannel) throws IOException { - partition.add(EventSerializer.toBuffer(event), targetChannel); - }

            -

          • public void writeEventToAllChannels(AbstractEvent event) throws IOException {
          • for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) { - Buffer buffer = EventSerializer.toBuffer(event); - partition.add(buffer, i); - }
          • }
            -
          • public void writeEndOfSuperstep() throws IOException {
          • for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
          • Buffer buffer = EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE);
          • partition.add(buffer, i);
            + /**
            + * Writes the given buffer to all available target channels.
            + *
            + * The buffer is taken over and used for each of the channels.
            + * It will be recycled afterwards.
            + *
            + * @param eventBuffer the buffer to write
            + * @throws IOException
            + */
            + public void writeBufferToAllChannels(final Buffer eventBuffer) throws IOException {
            + try {
            + for (int targetChannel = 0; targetChannel < partition.getNumberOfSubpartitions(); targetChannel++) {
            + // retain the buffer so that it can be recycled by each channel of targetPartition
            + eventBuffer.retain();
              • End diff –

          Similar comments as in the `RecordWriter` apply here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r90777962 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java — @@ -71,21 +68,26 @@ public void writeBuffer(Buffer buffer, int targetChannel) throws IOException { partition.add(buffer, targetChannel); } public void writeEvent(AbstractEvent event, int targetChannel) throws IOException { - partition.add(EventSerializer.toBuffer(event), targetChannel); - } - public void writeEventToAllChannels(AbstractEvent event) throws IOException { for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) { - Buffer buffer = EventSerializer.toBuffer(event); - partition.add(buffer, i); - } } - public void writeEndOfSuperstep() throws IOException { for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) { Buffer buffer = EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE); partition.add(buffer, i); + /** + * Writes the given buffer to all available target channels. + * + * The buffer is taken over and used for each of the channels. + * It will be recycled afterwards. + * + * @param eventBuffer the buffer to write + * @throws IOException + */ + public void writeBufferToAllChannels(final Buffer eventBuffer) throws IOException { + try { + for (int targetChannel = 0; targetChannel < partition.getNumberOfSubpartitions(); targetChannel++) { + // retain the buffer so that it can be recycled by each channel of targetPartition + eventBuffer.retain(); End diff – Similar comments as in the `RecordWriter` apply here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2805#discussion_r91306180

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java —
          @@ -131,35 +132,30 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter
          }

          public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {

          • for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
          • RecordSerializer<T> serializer = serializers[targetChannel];
            -
          • synchronized (serializer) {
          • Buffer buffer = serializer.getCurrentBuffer();
          • if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); - }

            else if (serializer.hasData())

            { - throw new IllegalStateException("No buffer, but serializer has buffered data."); - }

            -

          • targetPartition.writeEvent(event, targetChannel);
          • }
          • }
          • }
            + final Buffer eventBuffer = EventSerializer.toBuffer(event);
            + try {
            + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
            + RecordSerializer<T> serializer = serializers[targetChannel];
          • public void sendEndOfSuperstep() throws IOException, InterruptedException {
          • for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
          • RecordSerializer<T> serializer = serializers[targetChannel];
            + synchronized (serializer) {
            + Buffer buffer = serializer.getCurrentBuffer();
            + if (buffer != null) { + writeAndClearBuffer(buffer, targetChannel, serializer); + }

            else if (serializer.hasData())

            { + // sanity check + throw new IllegalStateException("No buffer, but serializer has buffered data."); + }
          • synchronized (serializer) {
          • Buffer buffer = serializer.getCurrentBuffer();
          • if (buffer != null) {
          • writeAndClearBuffer(buffer, targetChannel, serializer);
            + // retain the buffer so that it can be recycled by each channel of targetPartition
            + eventBuffer.retain();
              • End diff –

          done, thanks

          I opened FLINK-5277 for the ResultPartition#add test: https://issues.apache.org/jira/browse/FLINK-5277

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/2805#discussion_r91306180 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java — @@ -131,35 +132,30 @@ private void sendToTarget(T record, int targetChannel) throws IOException, Inter } public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer<T> serializer = serializers [targetChannel] ; - synchronized (serializer) { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); - } else if (serializer.hasData()) { - throw new IllegalStateException("No buffer, but serializer has buffered data."); - } - targetPartition.writeEvent(event, targetChannel); } } } + final Buffer eventBuffer = EventSerializer.toBuffer(event); + try { + for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { + RecordSerializer<T> serializer = serializers [targetChannel] ; public void sendEndOfSuperstep() throws IOException, InterruptedException { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer<T> serializer = serializers [targetChannel] ; + synchronized (serializer) { + Buffer buffer = serializer.getCurrentBuffer(); + if (buffer != null) { + writeAndClearBuffer(buffer, targetChannel, serializer); + } else if (serializer.hasData()) { + // sanity check + throw new IllegalStateException("No buffer, but serializer has buffered data."); + } synchronized (serializer) { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { writeAndClearBuffer(buffer, targetChannel, serializer); + // retain the buffer so that it can be recycled by each channel of targetPartition + eventBuffer.retain(); End diff – done, thanks I opened FLINK-5277 for the ResultPartition#add test: https://issues.apache.org/jira/browse/FLINK-5277
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on the issue:

          https://github.com/apache/flink/pull/2805

          Thanks for addressing the comments, Nico. Going to merge this with the next batch.

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on the issue: https://github.com/apache/flink/pull/2805 Thanks for addressing the comments, Nico. Going to merge this with the next batch.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2805

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2805
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in 9cff8c9 (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in 9cff8c9 (master).

            People

            • Assignee:
              NicoK Nico Kruber
              Reporter:
              NicoK Nico Kruber
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development