Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22717

JobMasterStopWithSavepointITCase.testRestartCheckpointCoordinatorIfStopWithSavepointFails fails with Adaptive Scheduler

    XMLWordPrintableJSON

Details

    Description

      While looking into FLINK-22692 I've found that AdaptiveScheduler tries to re-deploy executions if savepoint created by stop-with-savepoint fails. Each execution vertex is supposed to be in CREATED/SCHEDULED state:

      5850 [flink-akka.actor.default-dispatcher-6] WARN  org.apache.flink.runtime.minicluster.MiniCluster [] - Error in MiniCluster. Shutting the MiniCluster down.
      java.lang.IllegalStateException: The vertex must be in CREATED or SCHEDULED state to be deployed. Found state RUNNING
      	at org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:546) ~[classes/:?]
      	at org.apache.flink.runtime.executiongraph.ExecutionVertex.deploy(ExecutionVertex.java:427) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.Executing.deploySafely(Executing.java:139) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.Executing.deploy(Executing.java:132) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.Executing.<init>(Executing.java:63) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.Executing$Factory.getState(Executing.java:363) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.Executing$Factory.getState(Executing.java:334) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.transitionToState(AdaptiveScheduler.java:1139) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.goToExecuting(AdaptiveScheduler.java:787) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint.handleSavepointCompletion(StopWithSavepoint.java:106) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint.lambda$null$0(StopWithSavepoint.java:89) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.runIfState(AdaptiveScheduler.java:1093) ~[classes/:?]
      	at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$runIfState$26(AdaptiveScheduler.java:1108) ~[classes/:?]
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_271]
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_271]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[classes/:?]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[classes/:?]
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[classes/:?]
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[classes/:?]
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [scala-library-2.11.12.jar:?]
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [scala-library-2.11.12.jar:?]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?]
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?]
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [akka-actor_2.11-2.5.21.jar:2.5.21]
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [akka-actor_2.11-2.5.21.jar:2.5.21]
      

       (currently, subtasks do not fail if the checkpoint fails)

       

      In test, the cluster stops and test fails with an unrelated assertion error (that the cluster is not running).

      The other test failure (suspendWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished) is also caused by cluster not running.

       

      cc: @trohrmann, @rmetzger

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              roman Roman Khachatryan
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: