Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6662

ClassNotFoundException: o.a.f.r.j.t.JobSnapshottingSettings recovering job

    Details

      Description

      Running flink mesos on 1.3-release branch, I'm seeing the following error on appmaster startup:

      2017-05-22 15:32:45.946 [flink-akka.actor.default-dispatcher-17] WARN  o.a.flink.mesos.runtime.clusterframework.MesosJobManager  - Failed to recover job 088027410f1a628e7dfc59dc23df3ded.
      java.lang.Exception: Failed to retrieve the submitted job graph from state handle.
              at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(JobManager.scala:536)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1.apply(JobManager.scala:533)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1.apply(JobManager.scala:533)
              at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$1.apply$mcV$sp(JobManager.scala:533)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$1.apply(JobManager.scala:529)
              at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$1.apply(JobManager.scala:529)
              at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
              at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
              at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
              at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
              at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
              at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
              at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
              at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.ClassNotFoundException: org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings
              at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
              at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
              at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
              at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
              at java.lang.Class.forName0(Native Method)
              at java.lang.Class.forName(Class.java:348)
              at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64)
              at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
              at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
              at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
              at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
              at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
              at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
              at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
              at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
              at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
              at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
              at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
              at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
              at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
              at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
              at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
              at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:184)
              ... 15 common frames omitted
      

        Issue Links

          Activity

          Hide
          StephanEwen Stephan Ewen added a comment -

          Are you trying to let a Flink 1.3 recover a pending Job from an earlier Flink version via a high-availability setup?

          That pattern is not supported - Framework version upgrades should go through savepoints, and not through ZooKeeper.
          A fair point is, though, that this should be reported as an explicit exception, not simply a ClassNotFoundException.

          Show
          StephanEwen Stephan Ewen added a comment - Are you trying to let a Flink 1.3 recover a pending Job from an earlier Flink version via a high-availability setup? That pattern is not supported - Framework version upgrades should go through savepoints, and not through ZooKeeper. A fair point is, though, that this should be reported as an explicit exception, not simply a ClassNotFoundException .
          Hide
          StephanEwen Stephan Ewen added a comment -

          Till Rohrmann What do you think about this issue?

          Show
          StephanEwen Stephan Ewen added a comment - Till Rohrmann What do you think about this issue?
          Hide
          jstehler Jared Stehler added a comment -

          Yup, was just booting a new flink on 1.3 over an existing 1.2 setup, so the system was attempting recovery on startup. I can clear out the zk references and redeploy.

          Show
          jstehler Jared Stehler added a comment - Yup, was just booting a new flink on 1.3 over an existing 1.2 setup, so the system was attempting recovery on startup. I can clear out the zk references and redeploy.
          Hide
          till.rohrmann Till Rohrmann added a comment -

          +1 for improving the exception message. I will add a `ClassNotFoundException` handler and output a better error message.

          Show
          till.rohrmann Till Rohrmann added a comment - +1 for improving the exception message. I will add a `ClassNotFoundException` handler and output a better error message.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3972

          FLINK-6662 [errMsg] Improve error message if recovery from RetrievableStateHandles fails

          When recovering state from a ZooKeeperStateHandleStore it can happen that the deserialization
          fails, because one tries to recover state from an old Flink version which is not compatible.
          In this case we should output a better error message such that the user can easily spot the
          problem.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink improveErrorMessages

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3972.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3972


          commit 31d099c4768f1ee8dfbecfd8eddc6f05842425e6
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-23T13:42:38Z

          FLINK-6662 [errMsg] Improve error message if recovery from RetrievableStateHandles fails

          When recovering state from a ZooKeeperStateHandleStore it can happen that the deserialization
          fails, because one tries to recover state from an old Flink version which is not compatible.
          In this case we should output a better error message such that the user can easily spot the
          problem.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3972 FLINK-6662 [errMsg] Improve error message if recovery from RetrievableStateHandles fails When recovering state from a ZooKeeperStateHandleStore it can happen that the deserialization fails, because one tries to recover state from an old Flink version which is not compatible. In this case we should output a better error message such that the user can easily spot the problem. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink improveErrorMessages Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3972.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3972 commit 31d099c4768f1ee8dfbecfd8eddc6f05842425e6 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-23T13:42:38Z FLINK-6662 [errMsg] Improve error message if recovery from RetrievableStateHandles fails When recovering state from a ZooKeeperStateHandleStore it can happen that the deserialization fails, because one tries to recover state from an old Flink version which is not compatible. In this case we should output a better error message such that the user can easily spot the problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3972#discussion_r118026253

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java —
          @@ -376,8 +377,14 @@ private static CompletedCheckpoint retrieveCompletedCheckpoint(Tuple2<Retrievabl

          try

          { return stateHandlePath.f0.retrieveState(); - }

          catch (Exception e)

          { - throw new FlinkException("Could not retrieve checkpoint " + checkpointId + ". The state handle seems to be broken.", e); + }

          catch (ClassNotFoundException cnfe)

          { + throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " + + stateHandlePath.f1 + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); + }

          catch (IOException ioe) {
          + throw new FlinkException("Could not retrieve " + checkpointId + " worker from state handle under " +
          — End diff –

          shouldn't this say `Could not retrieve checkpoint " + checkpointId + " from state handle under` like in case of an CNFE?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3972#discussion_r118026253 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java — @@ -376,8 +377,14 @@ private static CompletedCheckpoint retrieveCompletedCheckpoint(Tuple2<Retrievabl try { return stateHandlePath.f0.retrieveState(); - } catch (Exception e) { - throw new FlinkException("Could not retrieve checkpoint " + checkpointId + ". The state handle seems to be broken.", e); + } catch (ClassNotFoundException cnfe) { + throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " + + stateHandlePath.f1 + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); + } catch (IOException ioe) { + throw new FlinkException("Could not retrieve " + checkpointId + " worker from state handle under " + — End diff – shouldn't this say `Could not retrieve checkpoint " + checkpointId + " from state handle under` like in case of an CNFE?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3972#discussion_r118068470

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java —
          @@ -376,8 +377,14 @@ private static CompletedCheckpoint retrieveCompletedCheckpoint(Tuple2<Retrievabl

          try

          { return stateHandlePath.f0.retrieveState(); - }

          catch (Exception e)

          { - throw new FlinkException("Could not retrieve checkpoint " + checkpointId + ". The state handle seems to be broken.", e); + }

          catch (ClassNotFoundException cnfe)

          { + throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " + + stateHandlePath.f1 + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); + }

          catch (IOException ioe) {
          + throw new FlinkException("Could not retrieve " + checkpointId + " worker from state handle under " +
          — End diff –

          Yes, a copy & paste error from my side. I will correct it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3972#discussion_r118068470 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java — @@ -376,8 +377,14 @@ private static CompletedCheckpoint retrieveCompletedCheckpoint(Tuple2<Retrievabl try { return stateHandlePath.f0.retrieveState(); - } catch (Exception e) { - throw new FlinkException("Could not retrieve checkpoint " + checkpointId + ". The state handle seems to be broken.", e); + } catch (ClassNotFoundException cnfe) { + throw new FlinkException("Could not retrieve checkpoint " + checkpointId + " from state handle under " + + stateHandlePath.f1 + ". This indicates that you are trying to recover from state written by an " + + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); + } catch (IOException ioe) { + throw new FlinkException("Could not retrieve " + checkpointId + " worker from state handle under " + — End diff – Yes, a copy & paste error from my side. I will correct it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3972

          Thanks for the review @zentol.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3972 Thanks for the review @zentol.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3972

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3972
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.4.0: ac6e5c9a03177ad18899e27c8877efb0c9211842
          1.3.0: d552b34470748de803a999c2c4c1557c49b30045

          Show
          till.rohrmann Till Rohrmann added a comment - 1.4.0: ac6e5c9a03177ad18899e27c8877efb0c9211842 1.3.0: d552b34470748de803a999c2c4c1557c49b30045

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              jstehler Jared Stehler
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development