Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19741

InternalTimeServiceManager fails to restore due to corrupt reads if there are other users of raw keyed state streams

    XMLWordPrintableJSON

Details

    Description

      Diagnosis

      Currently, when restoring a InternalTimeServiceManager, we always attempt to read from the provided raw keyed state streams (using InternalTimerServiceSerializationProxy):
      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117

      This is incorrect, since we don't write with the InternalTimerServiceSerializationProxy if the timers do not require legacy synchronous snapshots:
      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
      (we currently only require that when users use RocksDB backend + heap timers).

      Therefore, the InternalTimeServiceManager can fail to be created on restore due to corrupt reads in the case where:

      • a checkpoint was taken where useLegacySynchronousSnapshots is false (hence nothing was written, and the time service manager does not use the raw keyed stream)
      • the raw keyed stream is used elsewhere (e.g. in the Flink application's user code)
      • on restore from the checkpoint, InternalTimeServiceManagerImpl.create() attempts to read from the raw keyed stream with the InternalTimerServiceSerializationProxy.

      Full error stack trace (with Flink 1.11.1):

      2020-10-21 13:16:51
      java.lang.Exception: Exception while creating StreamOperatorStateContext.
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.io.EOFException
      	at java.io.DataInputStream.readFully(DataInputStream.java:197)
      	at java.io.DataInputStream.readUTF(DataInputStream.java:609)
      	at java.io.DataInputStream.readUTF(DataInputStream.java:564)
      	at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:110)
      	at org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:76)
      	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:217)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:234)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
      	... 9 more
      

      Reproducing

      • Have an application with any operator that uses and writes to raw keyed state streams
      • Use heap backend + any timer factory or RocksDB backend + RocksDB timers
      • Take a savepoint or wait for a checkpoint, and trigger a restore

      Proposed Fix

      The fix would be to also respect the useLegacySynchronousSnapshots flag in:
      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231

      Attachments

        Issue Links

          Activity

            People

              tzulitai Tzu-Li (Gordon) Tai
              tzulitai Tzu-Li (Gordon) Tai
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: