Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20285

LazyFromSourcesSchedulingStrategy is possible to schedule non-CREATED vertices

    XMLWordPrintableJSON

    Details

      Description

      LazyFromSourcesSchedulingStrategy is possible to schedule vertices which are not in CREATED state. This will lead result in unexpected check failure and result in fatal error (see attached error).

      The reason is that the status of a vertex to schedule was changed in LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices() during the invocation of schedulerOperations.allocateSlotsAndDeploy(...) on other vertices.

      e.g. ev1 and ev2 are in the same pipelined region and are restarted one by one in the scheduling loop in LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices(). They are all CREATED at the moment. ev1 is scheduled first but it immediately fails due to some slot allocation error and ev2 will be canceled as a result. So when ev2 is scheduled, its state would be CANCELED and the state check failed.

      More details see FLINK-20220.

      2020-11-19 13:34:17,231 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler      [] - FATAL: Thread 'flink-akka.actor.default-dispatcher-15' produced an uncaught exception. Stopping the process...
      java.util.concurrent.CompletionException: java.lang.IllegalStateException: expected vertex aafcbb93173905cec9672e46932d7790_3 to be in CREATED state, was: CANCELED
              at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_222]
              at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_222]
              at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708) ~[?:1.8.0_222]
              at java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:687) ~[?:1.8.0_222]
              at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) ~[?:1.8.0_222]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.2.jar:1.11.2]
              at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.2.jar:1.11.2]
      Caused by: java.lang.IllegalStateException: expected vertex aafcbb93173905cec9672e46932d7790_3 to be in CREATED state, was: CANCELED
              at org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$validateDeploymentOptions$3(DefaultScheduler.java:326) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ~[?:1.8.0_222]
              at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_222]
              at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_222]
              at java.util.Collections$2.tryAdvance(Collections.java:4719) ~[?:1.8.0_222]
              at java.util.Collections$2.forEachRemaining(Collections.java:4727) ~[?:1.8.0_222]
              at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_222]
              at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_222]
              at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:1.8.0_222]
              at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:1.8.0_222]
              at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_222]
              at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) ~[?:1.8.0_222]
              at org.apache.flink.runtime.scheduler.DefaultScheduler.validateDeploymentOptions(DefaultScheduler.java:326) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.DefaultScheduler.allocateSlotsAndDeploy(DefaultScheduler.java:297) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy.allocateSlotsAndDeployExecutionVertices(LazyFromSourcesSchedulingStrategy.java:140) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.strategy.LazyFromSourcesSchedulingStrategy.restartTasks(LazyFromSourcesSchedulingStrategy.java:93) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:265) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
              at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) ~[?:1.8.0_222]
              ... 24 more
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                zhuzh Zhu Zhu
                Reporter:
                zhuzh Zhu Zhu
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: