Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20285

LazyFromSourcesSchedulingStrategy is possible to schedule non-CREATED vertices

    XMLWordPrintableJSON

Details

    Description

      LazyFromSourcesSchedulingStrategy is possible to schedule vertices which are not in CREATED state. This will lead result in unexpected check failure and result in fatal error (see attached error).

      The reason is that the status of a vertex to schedule was changed in LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices() during the invocation of schedulerOperations.allocateSlotsAndDeploy(...) on other vertices.

      e.g. ev1 and ev2 are in the same pipelined region and are restarted one by one in the scheduling loop in LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices(). They are all CREATED at the moment. ev1 is scheduled first but it immediately fails due to some slot allocation error and ev2 will be canceled as a result. So when ev2 is scheduled, its state would be CANCELED and the state check failed.

      More details see FLINK-20220.

      2020-11-19 13:34:17,231 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler      [] - FATAL: Thread 'flink-akka.actor.default-dispatcher-15' produced an uncaught exception. Stopping the process...
      java.util.concurrent.CompletionException: java.lang.IllegalStateException: expected vertex aafcbb93173905cec9672e46932d7790_3 to be in CREATED state, was: CANCELED
              at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_222]
              at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_222]
              at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708) ~[?:1.8.0_222]
              at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687) ~[?:1.8.0_222]
              at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_222]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.2.jar:1.11.2]
      Caused by: java.lang.IllegalStateException: expected vertex aafcbb93173905cec9672e46932d7790_3 to be in CREATED state, was: CANCELED
              at org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$validateDeploymentOptions$3(DefaultScheduler.java:326) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:1.8.0_222]
              at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_222]
              at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_222]
              at java.util.Collections$2.tryAdvance(Collections.java:4719) ~[?:1.8.0_222]
              at java.util.Collections$2.forEachRemaining(Collections.java:4727) ~[?:1.8.0_222]
              at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_222]
              at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_222]
              at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:1.8.0_222]
              at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:1.8.0_222]
              at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_222]
              at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) ~[?:1.8.0_222]
              at org.apache.flink.runtime.scheduler.DefaultScheduler.validateDeploymentOptions(DefaultScheduler.java:326) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlotsAndDeploy(DefaultScheduler.java:297) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy.allocateSlotsAndDeployExecutionVertices(LazyFromSourcesSchedulingStrategy.java:140) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy.restartTasks(LazyFromSourcesSchedulingStrategy.java:93) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:265) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) ~[?:1.8.0_222]
              ... 24 more
      

      Attachments

        Issue Links

          Activity

            People

              zhuzh Zhu Zhu
              zhuzh Zhu Zhu
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: