Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-19264

Jobs with identical graph shapes cannot be run concurrently

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.12.0
    • 1.12.0
    • None

    Description

      While working on FLINK-19123 I noticed that jobs often fail on MiniCluster when multiple jobs are running at the same time.

      I created a reproducer here: https://github.com/aljoscha/flink/tree/flink-19123-fix-test-stream-env-alternative. You can run MiniClusterConcurrencyITCase to see the problem in action. Sometimes the test will succeed, sometimes it will fail with

      java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      
      	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
      	at org.apache.flink.test.example.MiniClusterConcurrencyITCase.submitConcurrently(MiniClusterConcurrencyITCase.java:60)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
      	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
      	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
      	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
      	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
      	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
      	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
      	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
      	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
      	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
      	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
      	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
      	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
      	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
      	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
      	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
      	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
      	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
      	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
      Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:107)
      	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
      	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
      	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
      	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:926)
      	at akka.dispatch.OnComplete.internal(Future.scala:264)
      	at akka.dispatch.OnComplete.internal(Future.scala:261)
      	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
      	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
      	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
      	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
      	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
      	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
      	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
      	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
      	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
      	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
      	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
      	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:215)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:208)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:202)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:523)
      	at org.apache.flink.runtime.jobmaster.JobMaster$1.onMissingDeploymentsOf(JobMaster.java:237)
      	at org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentReconciler.reconcileExecutionDeployments(DefaultExecutionDeploymentReconciler.java:55)
      	at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.reportPayload(JobMaster.java:1229)
      	at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.reportPayload(JobMaster.java:1216)
      	at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.receiveHeartbeat(HeartbeatManagerImpl.java:199)
      	at org.apache.flink.runtime.jobmaster.JobMaster.heartbeatFromTaskManager(JobMaster.java:674)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      	... 4 more
      Caused by: org.apache.flink.util.FlinkException: Execution cbc357ccb763df2852fee8c4fc7d55f2_0_0 is unexpectedly no longer running on task executor 0a6edd8c-b1f4-48da-a228-c395b2aff92d.
      	at org.apache.flink.runtime.jobmaster.JobMaster$1.onMissingDeploymentsOf(JobMaster.java:241)
      	... 31 more
      

      If you reduce the number of submitted jobs the test will always pass.

      Am important piece of the puzzle seems to be this message that you will also find in the log output:

      4594 [mini-cluster-io-thread-25] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor [] - JobManager for job 580f78b9224c2079ebe0c2f6b08fb4ba with leader id b0ca0700de4f8e8a2555bfe7596741ac lost leadership.
      

      It seems that the JobMasters of the concurrent jobs somehow clash with each other.

      Attachments

        Issue Links

          Activity

            People

              guoyangze Yangze Guo
              aljoscha Aljoscha Krettek
              Votes:
              1 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: