Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32876

ExecutionTimeBasedSlowTaskDetector treats unscheduled tasks as slow tasks and causes speculative execution to fail.

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      When we enable speculative execution and configure job with the following configuration:

      execution.batch.speculative.enabled: true
      slow-task-detector.execution-time.baseline-ratio: 0.0
      slow-task-detector.execution-time.baseline-lower-bound: 0s

      The ExecutionTimeBasedSlowTaskDetector will identify ExecutionJobVertex that has not yet been scheduled as slow tasks and notify them to the SpeculativeScheduler. However, the SpeculativeScheduler requires that the corresponding ExecutionVertex has entered the scheduled state before scheduling backup tasks. If this requirement is not met, it will result in speculative execution failure.

      The exception stack trace is as follows:

      java.lang.IllegalStateException: Execution vertex b3f44e8b1dc132ff2a47f7955c75ef7d_0 does not have a recorded version  at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.getCurrentVersion(ExecutionVertexVersioner.java:71) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.lambda$getExecutionVertexVersions$1(ExecutionVertexVersioner.java:89) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_333]  at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1580) ~[?:1.8.0_333]  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_333]  at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_333]  at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_333]  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_333]  at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_333]  at org.apache.flink.runtime.scheduler.ExecutionVertexVersioner.getExecutionVertexVersions(ExecutionVertexVersioner.java:90) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler.notifySlowTasks(SpeculativeScheduler.java:377) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector.lambda$scheduleTask$1(ExecutionTimeBasedSlowTaskDetector.java:129) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_333]  at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_333]  at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451) ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451) ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218) ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akka48a43f0a-d73c-494a-a57b-ded9f5d82a84.jar:1.18-SNAPSHOT]  at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_333]  at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) [?:1.8.0_333]  at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) [?:1.8.0_333]  at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) [?:1.8.0_333] 

       

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            JunRuiLi Junrui Li
            JunRuiLi Junrui Li
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment