Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5484

Kryo serialization changed between 1.1 and 1.2

    Details

      Description

      I think the way that Kryo serializes data changed between 1.1 and 1.2.

      I have a generic Object that is serialized as part of a 1.1 savepoint that I cannot resume from with 1.2:

      org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
      	at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
      	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
      	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1486)
      	at com.dataartisans.DidKryoChange.main(DidKryoChange.java:74)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
      	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
      	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
      	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
      	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
      	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
      	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
      	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
      	at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
      	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
      	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1117)
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
      	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
      	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.IllegalStateException: Could not initialize keyed state backend.
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:286)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:199)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:649)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:636)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
      	at java.lang.Thread.run(Thread.java:745)
      Caused by: java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: Unable to find class: f
      	at com.twitter.chill.java.UnmodifiableJavaCollectionSerializer.read(UnmodifiableJavaCollectionSerializer.java:62)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
      	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
      	at org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot.deserialize(AbstractMemStateSnapshot.java:88)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restoreHeapState(HeapKeyedStateBackend.java:448)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restoreOldSavepointKeyedState(HeapKeyedStateBackend.java:406)
      	at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:240)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:784)
      	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:277)
      	... 6 more
      Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: f
      	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
      	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
      	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
      	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
      	at com.twitter.chill.java.UnmodifiableJavaCollectionSerializer.read(UnmodifiableJavaCollectionSerializer.java:59)
      	... 14 more
      Caused by: java.lang.ClassNotFoundException: f
      	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      	at java.lang.Class.forName0(Native Method)
      	at java.lang.Class.forName(Class.java:348)
      	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
      	... 18 more
      

      Running the same program with 1.2 and triggering and resuming a savepoint works.

        Issue Links

          Activity

          Hide
          StephanEwen Stephan Ewen added a comment -

          Good catch. My first thought is that this may be due to a bump in the Chill dependency, which registers more classes (or in a different order) by default.

          Show
          StephanEwen Stephan Ewen added a comment - Good catch. My first thought is that this may be due to a bump in the Chill dependency, which registers more classes (or in a different order) by default.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/3152

          FLINK-5484 [serialization] Revert Chill version update

          This PR reverts the Twitter Chill dependency update. The version updates breaks backwards compatability for savepoints which contain user types that were serialized with Kryo, because Chills adds new default serializers that change the class IDs.

          In Flink 1.1 the default next available class ID was X and registered user types got IDs assigned starting at X. In Flink 1.2, the newly added serializers in Chill got assigned IDs starting at X before the user types are registered, which can lead to user types trying to be deserialized with the wrong serializer.

          I've verified that this with a savepoint (the one that triggered this issue) and furthermore added a test that checks that the default registration map does not change between versions. Once we have proper serializer versioning, that test will become obsolete.

          I would like to merge this to: `release-1.1`, `release-1.2`, and `master`.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 5484-kryo_1.2

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3152.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3152


          commit 55483b71f36b84ac57d03a9b83e0e9d9b9b98eab
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2017-01-17T18:10:33Z

          FLINK-5484 [serialization] Add test for registered Kryo types

          commit ebd656310ac9e6323fc7b09632c8aef08f06ba48
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2017-01-18T10:27:43Z

          Revert "FLINK-2608 Updated Twitter Chill version."

          This reverts commit 0d3ff88b369fbb1b0a8fb0e8263c9ce0a9da1583.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/3152 FLINK-5484 [serialization] Revert Chill version update This PR reverts the Twitter Chill dependency update. The version updates breaks backwards compatability for savepoints which contain user types that were serialized with Kryo, because Chills adds new default serializers that change the class IDs. In Flink 1.1 the default next available class ID was X and registered user types got IDs assigned starting at X. In Flink 1.2, the newly added serializers in Chill got assigned IDs starting at X before the user types are registered, which can lead to user types trying to be deserialized with the wrong serializer. I've verified that this with a savepoint (the one that triggered this issue) and furthermore added a test that checks that the default registration map does not change between versions. Once we have proper serializer versioning, that test will become obsolete. I would like to merge this to: `release-1.1`, `release-1.2`, and `master`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 5484-kryo_1.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3152.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3152 commit 55483b71f36b84ac57d03a9b83e0e9d9b9b98eab Author: Ufuk Celebi <uce@apache.org> Date: 2017-01-17T18:10:33Z FLINK-5484 [serialization] Add test for registered Kryo types commit ebd656310ac9e6323fc7b09632c8aef08f06ba48 Author: Ufuk Celebi <uce@apache.org> Date: 2017-01-18T10:27:43Z Revert " FLINK-2608 Updated Twitter Chill version." This reverts commit 0d3ff88b369fbb1b0a8fb0e8263c9ce0a9da1583.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3152

          Good fix and test

          +1 to merge this!

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3152 Good fix and test +1 to merge this!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce closed the pull request at:

          https://github.com/apache/flink/pull/3152

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce closed the pull request at: https://github.com/apache/flink/pull/3152
          Hide
          uce Ufuk Celebi added a comment -

          Fixed in
          931929b (release-1.1),
          55483b7, a7644b1 (release-1.2),
          8fddae8, 586f818 (master).

          Show
          uce Ufuk Celebi added a comment - Fixed in 931929b (release-1.1), 55483b7, a7644b1 (release-1.2), 8fddae8, 586f818 (master).

            People

            • Assignee:
              Unassigned
              Reporter:
              uce Ufuk Celebi
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development