Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29347

Failed to restore from list state with empty protobuf object

    XMLWordPrintableJSON

Details

    Description

      I use protobuf generated class in an union list state.
      When my flink job restores from checkpoint, I get exception:

      Caused by: java.lang.RuntimeException: Could not create class com.MY_PROTOBUF_GENERATED_CLASS
      	at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) ~[my-lib-0.1.1-SNAPSHOT.jar:?] 
      	at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) ~[my-lib-0.1.1-SNAPSHOT.jar:?] 
      	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] 
      Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No more bytes left. 
      	at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) ~[my-lib-0.1.1-SNAPSHOT.jar:?] 
      	at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) ~[my-lib-0.1.1-SNAPSHOT.jar:?] 
      	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] 
      Caused by: java.io.EOFException: No more bytes left. 
      	at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) ~[my-lib-0.1.1-SNAPSHOT.jar:?] 
      	at com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) ~[my-lib-0.1.1-SNAPSHOT.jar:?] 
      	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.4.jar:1.14.4] 
      	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] 
      

       

      I find it is because when protobuf serializer serializes an object, which is built directly with builder without assign any value to field, the serializer will generate a zero length byte[] and then write it into state with content '\0'(indicates zero length data).

      When recovered from checkpoint, protobuf seralizer deserialize the data. It get length 0, and call InputStream#read(byte[] bytes, int offset, int count) with count = 0.

      The underlying Input implementation is NoFetchingInput. It will call Inputsteam#read(byte[] bytes, int offset, int count) with count = 0.

      The InputStream implementation is ByteStateHandleInputStream, It will return -1 as long as no data left in memory,even if count is 0.

      A simple fix is add check before return -1. If caller reads 0 bytes, it should always return 0 instead of -1.

      Attachments

        Issue Links

          Activity

            People

              shenjiaqi shen
              shenjiaqi shen
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: