Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18452

Fix StateMigrationException because RetractableTopNFunction#ComparatorWrapper might be incompatible

    XMLWordPrintableJSON

    Details

      Description

      We found that in SQL jobs using "Top-N" functionality provided by the blink planner, the job state cannot be retrieved because of "incompatible" state serializers (in fact they are compatible).

      The error log is displayed like below

      taskmanager.log

      2020-06-30 09:19:32.089 [Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, serverid,  quantity]) (1/1)] INFO  org.apache.flink.runtime.taskmanager.Task  - Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[appkey, serverid], orderBy=[quantity DESC], select=[appkey, serverid, oid, quantity]) (1/1) (bd4d2e4327efac57dc70e220b8de460b) switched from RUNNING to FAILED.
      java.lang.RuntimeException: Error while getting state
              at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
              at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
              at org.apache.flink.table.runtime.operators.rank.RetractableTopNFunction.open(RetractableTopNFunction.java:115)
              at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
              at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
              at org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:57)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:990)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
              at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible.
              at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
              at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
              at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
              at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
              at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:72)
              at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
              at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
              at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
              at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
              ... 13 more

       
      After careful debugging, it is found to be an issue with the compatibility check of type serializers.
       
      In short, during checkpointing, Flink serializes SortedMapSerializer by creating a SortedMapSerializerSnapshot object, and the original comparator is encapsulated within the object (here we call it StreamExecSortComparator$579).
       
      At restoration, the object is read and restored as normal. However, during the construction of RetractableTopNFunction instance, another Comparator is provided by Flink as an argument (we call it StreamExecSortComparator$626), and it is later used in the ValueStateDescriptor which acts like a key to the state store.
       
      Here comes the problem: when the newly-restored Flink program tries to access state (getState) through the previously mentioned ValueStateDescriptor, the State Backend firstly detects whether the provided comparator in state descriptor is compatible with the one in snapshot, eventually the logic goes to the equals method at RetractableTopNFunction.ComparatorWrapper class.
       
      In the equals method, here is a code snippet:

      return generatedRecordComparator.getClassName().equals(oGeneratedComparator.getClassName()) &&
            generatedRecordComparator.getCode().equals(oGeneratedComparator.getCode()) &&
            Arrays.equals(generatedRecordComparator.getReferences(), oGeneratedComparator.getReferences());
      

      After debugging, we found that the class name of comparator within snapshot is StreamExecSortComparator$579, and the class name of comparator provided in the new job is StreamExecSortComparator$626, hence this method always returns false, even though actually they are indeed compatible (acts the same). Also, because the code in each generator is generated independently, the corresponding varaibles within the two comparators are highly likely to be different (isNullA$581 vs isNullA$682).
       
      Hence we believe that the implementation of equals method has serious flaws, and should be addressed in later releases.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                jark Jark Wu
                Reporter:
                kyledong Weike Dong
              • Votes:
                0 Vote for this issue
                Watchers:
                8 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: