Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10493

Macro generated CaseClassSerializer considered harmful

    XMLWordPrintableJSON

Details

    Description

      The Flink Scala API uses implicits and macros to generate TypeInformation and TypeSerializer objects for types.  In the case of Scala tuple and case classes, the macro generates an anonymous CaseClassSerializer class.  

      The Scala compiler will generate a name for the anonymous class that depends on the relative position in the code of the macro invocation to other anonymous classes.  If the code is changed such that the anonymous class relative position changes, even if the overall logic of the code or the type in question do not change, the name of the serializer class will change.

      That will result in errors, such as the one below, if the job is restored from a savepoint, as the serializer to read the data in the savepoint will no longer be found, as its name will have changed.

      At the very least, there should be a prominent warning in the documentation about this issue. Minor code changes can result in jobs that can't restore previous state. Ideally, the use of anonymous classes should be deprecated if possible.

      WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
      java.io.IOException: Unloadable class for type serializer.
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
      	at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
      	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
      	at java.lang.Thread.run(Unknown Source)
      Caused by: java.io.InvalidClassException: failed to read class descriptor
      	at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
      	at java.io.ObjectInputStream.readClassDesc(Unknown Source)
      	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
      	at java.io.ObjectInputStream.readObject0(Unknown Source)
      	at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
      	at java.io.ObjectInputStream.readSerialData(Unknown Source)
      	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
      	at java.io.ObjectInputStream.readObject0(Unknown Source)
      	at java.io.ObjectInputStream.readObject(Unknown Source)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
      	... 14 more
      Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
      	at java.net.URLClassLoader.findClass(Unknown Source)
      	at java.lang.ClassLoader.loadClass(Unknown Source)
      	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
      	at java.lang.ClassLoader.loadClass(Unknown Source)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Unknown Source)
      	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
      	at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
      	... 24 more
      WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - Deserialization of serializer errored; replacing with null.
      java.io.IOException: Unloadable class for type serializer.
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
      	at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
      	at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
      	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
      	at java.lang.Thread.run(Unknown Source)
      Caused by: java.io.InvalidClassException: failed to read class descriptor
      	at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
      	at java.io.ObjectInputStream.readClassDesc(Unknown Source)
      	at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
      	at java.io.ObjectInputStream.readObject0(Unknown Source)
      	at java.io.ObjectInputStream.readObject(Unknown Source)
      	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
      	... 18 more
      Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
      	at java.net.URLClassLoader.findClass(Unknown Source)
      	at java.lang.ClassLoader.loadClass(Unknown Source)
      	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
      	at java.lang.ClassLoader.loadClass(Unknown Source)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Unknown Source)
      	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
      	at org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
      	... 24 more
      ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
      java.lang.NullPointerException
      	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
      	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
      	at java.lang.Thread.run(Unknown Source)
      INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Kafka Topic -> Async Function (1/1) (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
      java.lang.IllegalStateException: Could not initialize operator state backend.
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
      	at java.lang.Thread.run(Unknown Source)
      Caused by: java.io.IOException: Unable to restore operator state [_async_wait_operator_state_]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.
      	at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
      	... 6 more
      

       

      Attachments

        Issue Links

          Activity

            People

              igalshilman Igal Shilman
              elevy Elias Levy
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m