Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19748

KeyGroupRangeOffsets#KeyGroupOffsetsIterator should skip key groups that don't have a defined offset

    XMLWordPrintableJSON

    Details

      Description

      Currently, on commit the UnboundedFeedbackLogger only calls startNewKeyGroup on the raw keyed stream for key groups that actually have logged messages:
      https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102

      This means that it might skip some key groups, if a key group doesn't have any logged messages.

      This doesn't conform with the expected usage of Flink's KeyedStateCheckpointOutputStream, where it expects that for ALL key groups within the range, startNewKeyGroup needs to be invoked.
      The reason for this is that underneath, calling startNewKeyGroup would also record the starting stream offset position for the key group.
      However, when iterating through a raw keyed stream, the key group offsets iterator KeyGroupRangeOffsets#KeyGroupOffsetsIterator doesn't take into account that some key groups weren't written and therefore do not have offsets defined, and the streams will be seeked to incorrect positions.

      Ultimately, if some key groups were skipped while writing to the raw keyed stream, the following error will be thrown on restore:

      java.lang.Exception: Exception while creating StreamOperatorStateContext.
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: java.io.IOException: position out of bounds
      	at org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
      	... 9 more
      Caused by: java.io.IOException: position out of bounds
      	at org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
      	... 10 more
      

      Solution

      We change the KeyGroupRangeOffsets#KeyGroupOffsetsIterator in Flink to skip key groups that don't have a defined offset (i.e. startNewKeyGroup wasn't called for these key groups).

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                tzulitai Tzu-Li (Gordon) Tai
                Reporter:
                tzulitai Tzu-Li (Gordon) Tai
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: