Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22424

Writing to already released buffers potentially causing data corruption during job failover/cancellation

    XMLWordPrintableJSON

    Details

      Description

      I modified the code to not re-use the same memory segments, but on recycling always free up the segment. And what I have observed is a similar problem as reported in FLINK-21181 ticket, but even more severe:

      Caused by: java.lang.RuntimeException: segment has been freed
      	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109)
      	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93)
      	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
      	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
      	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
      	at org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:477)
      	at org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:468)
      	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
      	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533)
      	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1395)
      	... 11 more
      Caused by: java.lang.IllegalStateException: segment has been freed
      	at org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:483)
      	at org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:1398)
      	at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:100)
      	at org.apache.flink.runtime.io.network.buffer.BufferBuilder.appendAndCommit(BufferBuilder.java:82)
      	at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:250)
      	at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142)
      	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
      	at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
      	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
      	... 24 more
      

      That's happening also during cancellation/job failover. It's failing when trying to write to already `free`'ed up buffer. Without my changes, this code would silently write some data to a buffer that has already been recycled/returned to the pool. If someone else would pick up this buffer, it would easily lead to the data corruption.

      As far as I can tell, the exact reason behind this is that the buffer to which timer attempts to write to, has been released from `ResultSubpartition#onConsumedSubpartition`, causing `BufferConsumer` to be closed (which recycles/frees underlying memory segment ), while matching `BufferBuilder` is still being used...

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                pnowojski Piotr Nowojski
                Reporter:
                pnowojski Piotr Nowojski
              • Votes:
                1 Vote for this issue
                Watchers:
                7 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: