Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.9.3, 1.10.2, 1.11.2
Description
Diagnosis
Currently, when restoring a InternalTimeServiceManager, we always attempt to read from the provided raw keyed state streams (using InternalTimerServiceSerializationProxy):
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L117
This is incorrect, since we don't write with the InternalTimerServiceSerializationProxy if the timers do not require legacy synchronous snapshots:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L192
(we currently only require that when users use RocksDB backend + heap timers).
Therefore, the InternalTimeServiceManager can fail to be created on restore due to corrupt reads in the case where:
- a checkpoint was taken where useLegacySynchronousSnapshots is false (hence nothing was written, and the time service manager does not use the raw keyed stream)
- the raw keyed stream is used elsewhere (e.g. in the Flink application's user code)
- on restore from the checkpoint, InternalTimeServiceManagerImpl.create() attempts to read from the raw keyed stream with the InternalTimerServiceSerializationProxy.
Full error stack trace (with Flink 1.11.1):
2020-10-21 13:16:51 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readUTF(DataInputStream.java:609) at java.io.DataInputStream.readUTF(DataInputStream.java:564) at org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy.read(InternalTimerServiceSerializationProxy.java:110) at org.apache.flink.core.io.PostVersionedIOReadableWritable.read(PostVersionedIOReadableWritable.java:76) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.restoreStateForKeyGroup(InternalTimeServiceManager.java:217) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:234) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167) ... 9 more
Reproducing
- Have an application with any operator that uses and writes to raw keyed state streams
- Use heap backend + any timer factory or RocksDB backend + RocksDB timers
- Take a savepoint or wait for a checkpoint, and trigger a restore
Proposed Fix
The fix would be to also respect the useLegacySynchronousSnapshots flag in:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java#L231
Attachments
Issue Links
- causes
-
FLINK-19692 Can't restore feedback channel from savepoint
- Closed
- links to