Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19330

Recovery with async operations fails due to uninitialized runtimeContext

    XMLWordPrintableJSON

    Details

      Description

      In Flink 1.11, the AbstractStreamOperator's runtimeContext is not fully initialized when executing

       AbstractStreamOperator#intializeState()

      in particular KeyedStateStore is set after intializeState was finished.
      See:
      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L258,L259
      This behaviour was changed from Flink 1.10->Flink 1.11.

      StateFun's FunctionGroupOperator performs its initialization logic at initalizeState, and it requires an already initialized runtimeContext.

      This situation causes the following failure after recovery:

      java.lang.RuntimeException: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
      	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.lambda$applyToAllKeys$0(AbstractKeyedStateBackend.java:256) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_265]
      	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[?:1.8.0_265]
      	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647) ~[?:1.8.0_265]
      	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.applyToAllKeys(AbstractKeyedStateBackend.java:249) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.fireExpiredAsyncOperations(AsyncOperationFailureNotifier.java:42) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.initializeState(FunctionGroupOperator.java:160) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
      Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.
      	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:223) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:188) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	at org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateAccessor(FlinkState.java:69) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindValue(FlinkStateBinder.java:48) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:30) ~[statefun-flink-distribution.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:46) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:73) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:50) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.Reductions.enqueue(Reductions.java:148) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.Reductions.enqueueAsyncOperationAfterRestore(Reductions.java:154) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.process(AsyncOperationFailureNotifier.java:66) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.statefun.flink.core.functions.AsyncOperationFailureNotifier.process(AsyncOperationFailureNotifier.java:30) ~[statefun-flink-core.jar:2.3-SNAPSHOT]
      	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.lambda$applyToAllKeys$0(AbstractKeyedStateBackend.java:252) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
      	... 16 more
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                igal Igal Shilman
                Reporter:
                igal Igal Shilman
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: