Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16576

State inconsistency on restore with memory state backends

    XMLWordPrintableJSON

Details

    Description

      I occasionally see a few state inconsistencies with the TopSpeedWindowing example in Flink. Restore would fail with either of these causes, but only for the memory state backends and only with some combinations of parallelism I took the savepoint with and parallelism I restore the job with:

      java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, endKeyGroup=95} does not contain key group 97 

      or

      java.lang.NullPointerException
      	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280) 

      or

      java.io.IOException: Corrupt stream, found tag: 8
      	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217) 

       

      I managed to make it reproducible in a test that I quickly hacked together in https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java (please checkout the whole repository since I had to change some dependencies).

      In a bit more detail, this is what I discovered before, also with a manual savepoint on S3:

      Savepoint that was taken with parallelism 2 (p=2) and shows the restore failure in three different ways (all running in Flink 1.10.0; but I also see it in Flink 1.9):

      • first of all, if I try to restore with p=2, everything is fine
      • if I restore with p=4 I get an exception like the one mentioned above:
        2020-03-11 15:53:35,149 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) (2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED.
        java.lang.Exception: Exception while creating StreamOperatorStateContext.
        	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
        	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        	at java.lang.Thread.run(Thread.java:748)
        Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from any of the 1 provided restore options.
        	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
        	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
        	... 9 more
        Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
        	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        	at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
        	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
        	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        	... 11 more
        Caused by: java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, endKeyGroup=95} does not contain key group 97
        	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
        	at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        	at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        	at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        	at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        	at org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
        	at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
        	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
        	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
        	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
        	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        	... 15 more
        
      • if I restore with p=3, I get the following exception instead:
        2020-03-11 21:40:28,390 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print to Std. Out (3/3) (2fb8acde321f6cbfec56c300153d6dea) switched from RUNNING to FAILED.
        java.lang.Exception: Exception while creating StreamOperatorStateContext.
        	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
        	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
        	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
        	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
        	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
        	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
        	at java.lang.Thread.run(Thread.java:748)
        Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/3) from any of the 1 provided restore options.
        	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
        	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
        	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
        	... 9 more
        Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
        	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
        	at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
        	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
        	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
        	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
        	... 11 more
        Caused by: java.lang.NullPointerException
        	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
        	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
        	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
        	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
        	... 15 more
        

      While the latter error somewhat indicates that the savepoint may be corrupt (or that the reader/deserializer messed up), it remains a mystery to me why I can successfully restore with p=2.

      • occasionally, for p=4, I also see this exception instead:
      10:23:12,046 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Exception while restoring keyed state backend for EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from alternative (1/1), will retry while more alternatives are available.
      org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
      	at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: java.io.IOException: Corrupt stream, found tag: 8
      	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
      	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
      	at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
      	at org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
      	at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
      	at org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
      	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:295)
      	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:256)
      	at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:155)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
      	... 15 more

       

      I narrowed the cause down to the state access inside DeltaTrigger: Removing the cleanup in org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger#clear() did not change anything but removing state access in its onElement (pictured below) seems to resolve the bug:

      @Override
      public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
         ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
         if (lastElementState.value() == null) {
            lastElementState.update(element);
            return TriggerResult.CONTINUE;
         }
         if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
            lastElementState.update(element);
            return TriggerResult.FIRE;
         }
         return TriggerResult.CONTINUE;
      } 

      Attachments

        Issue Links

          Activity

            People

              klion26 Congxian Qiu
              nkruber Nico Kruber
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m