Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.13.0, 1.14.0
-
None
-
None
Description
While looking into FLINK-22692 I've found that AdaptiveScheduler tries to re-deploy executions if savepoint created by stop-with-savepoint fails. Each execution vertex is supposed to be in CREATED/SCHEDULED state:
5850 [flink-akka.actor.default-dispatcher-6] WARN org.apache.flink.runtime.minicluster.MiniCluster [] - Error in MiniCluster. Shutting the MiniCluster down. java.lang.IllegalStateException: The vertex must be in CREATED or SCHEDULED state to be deployed. Found state RUNNING at org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:546) ~[classes/:?] at org.apache.flink.runtime.executiongraph.ExecutionVertex.deploy(ExecutionVertex.java:427) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.Executing.deploySafely(Executing.java:139) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.Executing.deploy(Executing.java:132) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.Executing.<init>(Executing.java:63) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.Executing$Factory.getState(Executing.java:363) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.Executing$Factory.getState(Executing.java:334) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.transitionToState(AdaptiveScheduler.java:1139) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.goToExecuting(AdaptiveScheduler.java:787) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint.handleSavepointCompletion(StopWithSavepoint.java:106) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint.lambda$null$0(StopWithSavepoint.java:89) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.runIfState(AdaptiveScheduler.java:1093) ~[classes/:?] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$runIfState$26(AdaptiveScheduler.java:1108) ~[classes/:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_271] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_271] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[classes/:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [scala-library-2.11.12.jar:?] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [akka-actor_2.11-2.5.21.jar:2.5.21] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [scala-library-2.11.12.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.11.12.jar:?] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [akka-actor_2.11-2.5.21.jar:2.5.21] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [akka-actor_2.11-2.5.21.jar:2.5.21]
(currently, subtasks do not fail if the checkpoint fails)
In test, the cluster stops and test fails with an unrelated assertion error (that the cluster is not running).
The other test failure (suspendWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished) is also caused by cluster not running.
cc: @trohrmann, @rmetzger
Attachments
Issue Links
- duplicates
-
FLINK-22266 Harden JobMasterStopWithSavepointITCase
- Closed
- relates to
-
FLINK-22692 CheckpointStoreITCase.testRestartOnRecoveryFailure fails with RuntimeException
- Resolved