Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-11083

CRowSerializerConfigSnapshot is not instantiable

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      An exception was encountered when restarting a job with savepoint in our production env,

      2018-12-04 20:28:25,091 INFO  10595 org.apache.flink.runtime.taskmanager.Task                   :917  - _OurCustomOperator_ -> select: () -> to: Tuple2 -> Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING to FAILED.
      java.lang.Exception: Exception while creating StreamOperatorStateContext.
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) from any of the 1 provided restore options.
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242)
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140)
      	... 5 more
      Caused by: java.lang.RuntimeException: The class 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' is not instantiable: The class has no (implicit) public nullary constructor, i.e. a constructor without arguments.
      	at org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412)
      	at org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
      	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211)
      	at org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218)
      	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64)
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
      	... 7 more
      

      I add tests to CRowSerializerTest to make sure this is definitely a bug,

        @Test
        def testDefaultConstructor(): Unit = {
          new CRowSerializer.CRowSerializerConfigSnapshot()
      
          /////// This would fail the test
          val serializerConfigSnapshotClass =
           Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot")
          InstantiationUtil.instantiate(serializerConfigSnapshotClass)
        }
      
        @Test
        def testStateRestore(): Unit = {
      
          class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, Integer] {
            var state: ListState[CRow] = _
            override def open(parameters: Configuration): Unit = {
              val stateDesc = new ListStateDescriptor[CRow]("CRow",
                new CRowTypeInfo(new RowTypeInfo(Types.INT)))
              state = getRuntimeContext.getListState(stateDesc)
            }
            override def processElement(value: Integer,
                ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context,
                out: Collector[Integer]): Unit = {
              state.add(new CRow(Row.of(value), true))
            }
          }
      
          val operator = new KeyedProcessOperator[Integer, Integer, Integer](new IKeyedProcessFunction)
      
          var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer](
            operator,
            new KeySelector[Integer, Integer] {
              override def getKey(value: Integer): Integer= -1
            },
            Types.INT, 1, 1, 0)
          testHarness.setup()
          testHarness.open()
          testHarness.processElement(new StreamRecord[Integer](1, 1L))
          testHarness.processElement(new StreamRecord[Integer](2, 1L))
          testHarness.processElement(new StreamRecord[Integer](3, 1L))
      
          assertEquals(1, numKeyedStateEntries(operator))
      
          val snapshot = testHarness.snapshot(0L, 0L)
          testHarness.close()
      
          testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer](
            operator,
            new KeySelector[Integer, Integer] {
              override def getKey(value: Integer): Integer= -1
            },
            Types.INT, 1, 1, 0)
          testHarness.setup()
      
          /////// This would throw the same exception as our production app do.
          testHarness.initializeState(snapshot)
      
          testHarness.open()
      
          assertEquals(1, numKeyedStateEntries(operator))
      
          testHarness.close()
        }
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kisimple boshu Zheng
            kisimple boshu Zheng
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 10m
                10m

                Slack

                  Issue deployment