Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19866

FunctionsStateBootstrapOperator.createStateAccessor fails due to uninitialized runtimeContext

    XMLWordPrintableJSON

Details

    Description

      It has bugs similar to FLINK-19330

      In Flink 1.11.2, statefun-flink-state-processor 2.2.0, the AbstractStreamOperator's runtimeContext is not fully initialized when executing
      AbstractStreamOperator#intializeState()
      in particular KeyedStateStore is set after intializeState was finished.
      See:
      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L258,L259
      This behaviour was changed from Flink 1.10->Flink 1.11.

      StateFun's FunctionsStateBootstrapOperator performs its initialization logic at initalizeState, and it requires an already initialized runtimeContext to create stateAccessor.

      This situation causes the following failure: 

      Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:223) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:188) at org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateAccessor(FlinkState.java:69) at org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindValue(FlinkStateBinder.java:48) at org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:30) at org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:46) at org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapFunctionRegistry.bindState(StateBootstrapFunctionRegistry.java:120) at org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapFunctionRegistry.initialize(StateBootstrapFunctionRegistry.java:103) at org.apache.flink.statefun.flink.state.processor.operator.StateBootstrapper.<init>(StateBootstrapper.java:39) at org.apache.flink.statefun.flink.state.processor.operator.FunctionsStateBootstrapOperator.initializeState(FunctionsStateBootstrapOperator.java:67) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258) at org.apache.flink.state.api.output.BoundedStreamTask.init(BoundedStreamTask.java:85) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:457) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.state.api.output.BoundedOneInputStreamTaskRunner.mapPartition(BoundedOneInputStreamTaskRunner.java:76) at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:504) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:748)

       

       

       

       

      Attachments

        Issue Links

          Activity

            People

              molin wang
              molin wang
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: